rhmessaging commits: r3623 - in store/trunk/cpp: tests and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2009-09-14 11:21:33 -0400 (Mon, 14 Sep 2009)
New Revision: 3623
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/python_tests/flow_to_disk.py
store/trunk/cpp/tests/run_python_tests
Log:
Reversed checkin 3622/813825 until its problems can be resolved.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2009-09-11 13:35:04 UTC (rev 3622)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2009-09-14 15:21:33 UTC (rev 3623)
@@ -252,8 +252,6 @@
#define MAX_AIO_SLEEPS 1000 // 10 sec
#define AIO_SLEEP_TIME 10000 // 10 ms
-// Return true if content is recovered from store; false if content is external and must be recovered from an external store.
-// Throw exception for all errors.
bool
JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset)
{
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-09-11 13:35:04 UTC (rev 3622)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-09-14 15:21:33 UTC (rev 3623)
@@ -1474,15 +1474,15 @@
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
if (txn->getXid().empty()) {
if (message->isContentReleased()) {
- jc->enqueue_extern_data_record(size, dtokp.get(), !message->isPersistent());
+ jc->enqueue_extern_data_record(size, dtokp.get(), false);
} else {
- jc->enqueue_data_record(buff, size, size, dtokp.get(), !message->isPersistent());
+ jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
}
} else {
if (message->isContentReleased()) {
- jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), !message->isPersistent());
+ jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
} else {
- jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), !message->isPersistent());
+ jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
}
}
} else {
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2009-09-11 13:35:04 UTC (rev 3622)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2009-09-14 15:21:33 UTC (rev 3623)
@@ -377,8 +377,7 @@
store.stage(pmsg);
//append to it
- msg->setStore(&store);
- msg->releaseContent(true);//ensure that data is not held in memory but is appended to disk when added
+ msg->releaseContent(&store);//ensure that data is not held in memory but is appended to disk when added
store.appendContent(cpmsg, data1);
store.appendContent(cpmsg, data2);
Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py 2009-09-11 13:35:04 UTC (rev 3622)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py 2009-09-14 15:21:33 UTC (rev 3623)
@@ -34,16 +34,16 @@
# --- Helper functions ---
- def _browse(self, queueName, destinationTag, acquireMode, numMsgs, msgSize, txnConsume):
+ def _browse(self, qn, dt, am, numMsgs, msgSize, txnConsume):
txid = None
if txnConsume:
- txid = self._makeXid("consumer-xid-%s" % queueName)
+ txid = self._makeXid("consumer-xid-%s" % qn)
self.session.dtx_select()
self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
- self.session.message_subscribe(queue=queueName, destination=destinationTag, acquire_mode=acquireMode)
- self.session.message_flow(destination=destinationTag, unit=self.session.credit_unit.message, value=0xFFFFFFFF)
- self.session.message_flow(destination=destinationTag, unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
- queue = self.session.incoming(destinationTag)
+ self.session.message_subscribe(queue=qn, destination=dt, acquire_mode=am)
+ self.session.message_flow(destination=dt, unit=self.session.credit_unit.message, value=0xFFFFFFFF)
+ self.session.message_flow(destination=dt, unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
+ queue = self.session.incoming(dt)
ids = RangedSet()
for msgNum in range(0, numMsgs):
expectedStr = self._makeMessage(msgNum, msgSize)
@@ -52,16 +52,16 @@
ids.add(msg.id)
return ids, txid
- def _checkCancel(self, queueName, destinationTag, numMsgs, ids):
+ def _checkCancel(self, qn, dt, numMsgs, ids):
self.session.message_release(ids)
- self.session.queue_declare(queue=queueName)
- self.assertEqual(numMsgs, self.session.queue_query(queue=queueName).message_count)
- self.session.message_cancel(destination=destinationTag)
+ self.session.queue_declare(queue=qn)
+ self.assertEqual(numMsgs, self.session.queue_query(queue=qn).message_count)
+ self.session.message_cancel(destination=dt)
- def _checkConsume(self, queueName, acquireMode, numMsgs, ids, txid, txnConsume):
- if acquireMode == self.session.acquire_mode.not_acquired:
- self.session.queue_declare(queue=queueName)
- self.assertEqual(numMsgs, self.session.queue_query(queue=queueName).message_count)
+ def _checkConsume(self, qn, am, numMsgs, ids, txid, txnConsume):
+ if am == self.session.acquire_mode.not_acquired:
+ self.session.queue_declare(queue=qn)
+ self.assertEqual(numMsgs, self.session.queue_query(queue=qn).message_count)
response = self.session.message_acquire(ids)
for range in ids:
for msgId in range:
@@ -73,15 +73,10 @@
self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid, one_phase=False).status)
self._resetChannel()
- def _checkEmpty(self, queueName):
- self.session.queue_declare(queue=queueName)
- self.assertEqual(0, self.session.queue_query(queue=queueName).message_count)
+ def _checkEmpty(self, qn):
+ self.session.queue_declare(queue=qn)
+ self.assertEqual(0, self.session.queue_query(queue=qn).message_count)
- def _handleSessionClose(self):
- time.sleep(1)
- self.tearDown()
- self.setUp()
-
def _makeMessage(self, msgCnt, msgSize):
msg = "Message-%04d" % msgCnt
msgLen = len(msg)
@@ -98,45 +93,19 @@
branchqual = "v%s" % self.txCounter
return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
- def _produceDirect(self, queueName, deliveryMode, numMsgs, msgSize, txnProduce):
+ def _produce(self, qn, dm, numMsgs, msgSize, txnProduce):
if txnProduce:
- txid = self._makeXid("producer-xid-%s" % queueName)
+ txid = self._makeXid("producer-xid-%s" % qn)
self.session.dtx_select()
self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
for msgNum in range(0, numMsgs):
msg_str = self._makeMessage(msgNum, msgSize)
- self.session.message_transfer(message=Message(self.session.delivery_properties(routing_key=queueName, delivery_mode=deliveryMode), msg_str))
+ self.session.message_transfer(message=Message(self.session.delivery_properties(routing_key=qn, delivery_mode=dm), msg_str))
if txnProduce:
self.assertEqual(self.XA_OK, self.session.dtx_end(xid=txid).status)
self.assertEqual(self.XA_OK, self.session.dtx_prepare(xid=txid).status)
self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid, one_phase=False).status)
self._resetChannel()
-
- def _produceFanout(self, exchangeName, deliveryMode, numMsgs, msgSize, txnProduce, fail_on_msg):
- if txnProduce:
- txid = self._makeXid("producer-xid-%s" % exchangeName)
- self.session.dtx_select()
- self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
- try:
- for msgNum in range(0, numMsgs):
- msg_str = self._makeMessage(msgNum, msgSize)
- self.session.message_transfer(destination=exchangeName, message=Message(self.session.delivery_properties(delivery_mode=deliveryMode), msg_str))
- if fail_on_msg != None and msgNum > fail_on_msg and not txnProduce:
- self.fail("Failed to throw SessionException on message_transfer %s" % fail_on_msg)
- except SessionException:
- if fail_on_msg != None and msgNum == fail_on_msg and not txnProduce:
- self._handleSessionClose()
- if txnProduce:
- self.assertEqual(self.XA_OK, self.session.dtx_end(xid=txid).status)
- self.assertEqual(self.XA_OK, self.session.dtx_prepare(xid=txid).status)
- try:
- self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid, one_phase=False).status)
- except SessionException:
- if fail_on_msg != None and txnProduce:
- self._handleSessionClose()
- else:
- self.fail("Failed to throw expected SessionException on dtx_commit")
- self._resetChannel()
def _randomBool(self):
if random.randint(0, 1) > 0:
@@ -147,66 +116,109 @@
self.session.close()
self.session = self.conn.session("test-session", 1)
- # --- Simple test framework (single queue) ---
+ # --- Simple tests ---
- def simpleLimit(self, queueName, max_count = None, max_size = None, persistent = False, pre_acquired = True, numMsgs = 15, msgSize = None, browse = True, policy = "flow_to_disk"):
- if max_count != None or max_size != None:
- queueArgs = {'qpid.policy_type':policy}
+ def test_FlowToDisk_00_SimpleMaxCount(self):
+ """Flow-to-disk tests based on setting max_count"""
+ self.simpleLimit("test_FlowToDisk_00a", max_count = 10)
+ self.simpleLimit("test_FlowToDisk_00b", max_count = 10, persistent = True)
+ self.simpleLimit("test_FlowToDisk_00c", max_count = 10, max_size = 10000000, numMsgs = 100, msgSize = 10000)
+ self.simpleLimit("test_FlowToDisk_00d", max_count = 10, max_size = 10000000, persistent = True, numMsgs = 100, msgSize = 10000)
+
+ def test_FlowToDisk_01_SimpleMaxSize(self):
+ """Flow-to-disk tests based on setting max_size"""
+ self.simpleLimit("test_FlowToDisk_01a", max_size = 100)
+ self.simpleLimit("test_FlowToDisk_01b", max_size = 100, persistent = True)
+ self.simpleLimit("test_FlowToDisk_01c", max_size = 100000, numMsgs = 100, msgSize = 10000)
+ self.simpleLimit("test_FlowToDisk_01d", max_size = 100000, persistent = True, numMsgs = 100, msgSize = 10000)
+
+ def test_FlowToDisk_02_SimpleMaxCountNotAcquired(self):
+ """Flow-to-disk tests based on setting max_count, but not using pre-acquired mode (ie browsing)"""
+ self.simpleLimit("test_FlowToDisk_02a", max_count = 10, pre_acquired = False)
+ self.simpleLimit("test_FlowToDisk_02b", max_count = 10, persistent = True, pre_acquired = False)
+ self.simpleLimit("test_FlowToDisk_02c", max_count = 10, max_size = 10000000, pre_acquired = False, numMsgs = 100, msgSize = 10000)
+ self.simpleLimit("test_FlowToDisk_02d", max_count = 10, max_size = 10000000, persistent = True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
+
+ def test_FlowToDisk_03_SimpleMaxSizeNotAcquired(self):
+ """Flow-to-disk tests based on setting max_size, but not using pre-acquired mode (ie browsing)"""
+ self.simpleLimit("test_FlowToDisk_03a", max_size = 100, pre_acquired = False)
+ self.simpleLimit("test_FlowToDisk_03b", max_size = 100, persistent = True, pre_acquired = False)
+ self.simpleLimit("test_FlowToDisk_03c", max_size = 100, pre_acquired = False, numMsgs = 100, msgSize = 10000)
+ self.simpleLimit("test_FlowToDisk_03d", max_size = 100, persistent = True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
+
+ def test_FlowToDisk_04_MaxSizeMaxCount(self):
+ """Flow-to-disk tests based on setting both max-count and max-size at the same time"""
+ self.simpleLimit("test_FlowToDisk_04a", max_count = 10, max_size = 1000)
+ self.simpleLimit("test_FlowToDisk_04b", max_count = 10, max_size = 1000, msgSize = 250)
+ self.simpleLimit("test_FlowToDisk_04c", max_count = 10, max_size = 1000, persistent = True)
+ self.simpleLimit("test_FlowToDisk_04d", max_count = 10, max_size = 1000, msgSize = 250, persistent = True)
+
+ def test_FlowToDisk_05_Randomized(self):
+ """Randomized flow-to-disk tests"""
+ seed = long(1000.0 * time.time())
+ print "seed=0x%x" % seed
+ random.seed(seed)
+ for i in range(0, 10):
+ self.randomLimit(i)
+
+ def simpleLimit(self, qn, max_count = None, max_size = None, persistent = False, pre_acquired = True, numMsgs = 15, msgSize = None, browse = True):
+ qa = {'qpid.policy_type':'flow_to_disk'}
if max_count != None:
- queueArgs['qpid.max_count'] = max_count
+ qa['qpid.max_count'] = max_count
if max_size != None:
- queueArgs['qpid.max_size'] = max_size
+ qa['qpid.max_size'] = max_size
if persistent:
- deliveryMode = self.session.delivery_mode.persistent
+ dm = self.session.delivery_mode.persistent
else:
- deliveryMode = self.session.delivery_mode.non_persistent
+ dm = self.session.delivery_mode.non_persistent
if pre_acquired:
- acquireMode = self.session.acquire_mode.pre_acquired
+ am = self.session.acquire_mode.pre_acquired
else:
- acquireMode = self.session.acquire_mode.not_acquired
+ am = self.session.acquire_mode.not_acquired
# Cycle through the produce/consume block transaction combinations
for i in range(0, 4):
- txnProduce = i & 1 != 0 # Transactional produce
- txnConsume = i & 2 != 0 # Transactional consume
- self._txSimpleLimit(queueName, queueArgs, deliveryMode, acquireMode, numMsgs, msgSize, txnProduce, txnConsume, browse)
+ tp = i & 1 != 0 # Transactional produce
+ tc = i & 2 != 0 # Transactional consume
+ self.txSimpleLimit(qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse)
- def randomLimit(self, count, policy = "flow_to_disk"):
- queueArgs = {'qpid.policy_type':policy}
- queueName = "randomized_test_%04d" % count
+ def randomLimit(self, count):
+ qa = {'qpid.policy_type':'flow_to_disk'}
+ qn = "randomized_test_%04d" % count
# Flow to disk policy
maxCount = None
if self._randomBool():
maxCount = random.randint(0,10000)
- queueArgs['qpid.max_count'] = maxCount
+ qa['qpid.max_count'] = maxCount
maxSize = None
if self._randomBool():
maxSize = random.randint(0, 1000000)
- queueArgs['qpid.max_size'] = maxSize
+ qa['qpid.max_size'] = maxSize
# Persistence
if self._randomBool():
- deliveryMode = self.session.delivery_mode.persistent
+ dm = self.session.delivery_mode.persistent
else:
- deliveryMode = self.session.delivery_mode.non_persistent
+ dm = self.session.delivery_mode.non_persistent
# Acquired mode
if self._randomBool():
- acquireMode = self.session.acquire_mode.pre_acquired
+ am = self.session.acquire_mode.pre_acquired
browse = False
else:
- acquireMode = self.session.acquire_mode.not_acquired
+ am = self.session.acquire_mode.not_acquired
browse = self._randomBool()
numMsgs = random.randint(1, 10000)
sizeLimit = int(1000000 / numMsgs)
msgSize = random.randint(1, sizeLimit)
- txnProduce = self._randomBool()
- txnConsume = self._randomBool()
+ tp = self._randomBool()
+ tc = self._randomBool()
- self._txSimpleLimit(queueName, queueArgs, deliveryMode, acquireMode, numMsgs, msgSize, txnProduce, txnConsume, browse)
+ #print " qn=%s, qa=%s, dm=%s, am=%s, numMsgs=%d, msgSize=%d, tp=%s, tc=%s, browse=%s" % (qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse)
+ self.txSimpleLimit(qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse)
- def _txSimpleLimit(self, queueName, queueArgs, deliveryMode, acquireMode, numMsgs, msgSize, txnProduce, txnConsume, browse):
+ def txSimpleLimit(self, qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse):
"""
Test a simple case of message limits which will force flow-to-disk.
* queue_args sets a limit - either max_count and/or max_size
@@ -215,323 +227,19 @@
* Check the broker has no messages left.
"""
- self.session.queue_declare(queue=queueName, durable=True, arguments=queueArgs)
+ self.session.queue_declare(queue=qn, durable=True, arguments=qa)
- # Add messages
- self._produceDirect(queueName, deliveryMode, numMsgs, msgSize, txnProduce)
+ # --- Add messages ---
+ self._produce(qn, dm, numMsgs, msgSize, tp)
- # Browse messages (if possible)
- if acquireMode == self.session.acquire_mode.not_acquired and browse:
- dtA = "tagA-%d-%d" % (txnProduce, txnConsume)
- ids, txid = self._browse(queueName, dtA, acquireMode, numMsgs, msgSize, False)
- self._checkCancel(queueName, dtA, numMsgs, ids)
+ # --- Browse messages (if possible) ---
+ if am == self.session.acquire_mode.not_acquired and browse:
+ dtA = "tagA-%d-%d" % (tp, tc)
+ ids, txid = self._browse(qn, dtA, am, numMsgs, msgSize, False)
+ self._checkCancel(qn, dtA, numMsgs, ids)
- # Consume messages
- dtB = "tagB-%d-%d" % (txnProduce, txnConsume)
- ids, txid = self._browse(queueName, dtB, acquireMode, numMsgs, msgSize, txnConsume)
- self._checkConsume(queueName, acquireMode, numMsgs, ids, txid, txnConsume)
- self._checkEmpty(queueName)
-
- self.session.queue_delete(queue=queueName)
-
- # --- Multi-queue test framework ---
-
- def multiQueueLimit(self, queueMap, exchangeName = "amq.fanout", persistent = False, pre_acquired = True, numMsgs = 15, msgSize = None, browse = True, policy = "flow_to_disk", fail_on_msg = None):
- if persistent:
- deliveryMode = self.session.delivery_mode.persistent
- else:
- deliveryMode = self.session.delivery_mode.non_persistent
- if pre_acquired:
- acquireMode = self.session.acquire_mode.pre_acquired
- else:
- acquireMode = self.session.acquire_mode.not_acquired
- # Cycle through the produce/consume block transaction combinations
-# TODO - re-enable full transactional testing when BZ 522499 (which crashes the broker) is fixed.
-# for i in range(0, 4):
-# txnProduce = i & 1 != 0 # Transactional produce
-# txnConsume = i & 2 != 0 # Transactional consume
-# self._txMultiQueueLimit(queueMap, exchangeName, deliveryMode, acquireMode, numMsgs, msgSize, txnProduce, txnConsume, browse, fail_on_msg)
- for i in range(0, 2):
- txnConsume = i & 1 != 0 # Transactional consume
- self._txMultiQueueLimit(queueMap, exchangeName, deliveryMode, acquireMode, numMsgs, msgSize, False, txnConsume, browse, policy, fail_on_msg)
-
- def _txMultiQueueLimit(self, queueMap, exchangeName, deliveryMode, acquireMode, numMsgs, msgSize, txnProduce, txnConsume, browse, policy, fail_on_msg):
- """
- Test a multi-queue case
- queueMap = queue map where map is queue name (key) against queue args (value)
- """
- self._multiQueueSetup(queueMap, exchangeName, txnProduce, txnConsume, policy)
- self._produceFanout(exchangeName, deliveryMode, numMsgs, msgSize, txnProduce, fail_on_msg)
- if fail_on_msg == None:
- # Browse messages (ie get without acquiring)
- if acquireMode == self.session.acquire_mode.not_acquired and browse:
- for queue in queueMap.iterkeys():
- deliveryTagA = "tagA-%d-%d" % (txnProduce, txnConsume)
- queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
- ids, txid = self._browse(queueName, deliveryTagA, acquireMode, numMsgs, msgSize, False)
- self._checkCancel(queueName, deliveryTagA, numMsgs, ids)
- # Consume messages
- for queue in queueMap.iterkeys():
- deliveryTagB = "tagB-%d-%d" % (txnProduce, txnConsume)
- queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
- ids, txid = self._browse(queueName, deliveryTagB, acquireMode, numMsgs, msgSize, txnConsume)
- self._checkConsume(queueName, acquireMode, numMsgs, ids, txid, txnConsume)
- self._checkEmpty(queueName)
- self._multiQueueTeardown(queueMap, exchangeName, txnProduce, txnConsume)
-
- def _multiQueueSetup(self, queueMap, exchangeName, txnProduce, txnConsume, policy, exclusive = True):
- for queue, qp in queueMap.iteritems():
- queueArgs = {}
- ftd = False
- for p,v in qp.iteritems():
- if p == "persistent":
- d = v == True
- elif p == "qpid.max_count":
- if v != None:
- queueArgs[p] = v
- ftd = True
- elif p == "qpid.max_size":
- if v != None:
- queueArgs[p] = v
- ftd = True
- if ftd:
- queueArgs["qpid.policy_type"] = policy
- queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
- self.session.queue_declare(queue = queueName, durable = d, arguments = queueArgs, exclusive = exclusive)
- self.session.exchange_bind(exchange = exchangeName, queue = queueName)
-
- def _multiQueueTeardown(self, queueMap, exchangeName, txnProduce, txnConsume):
- for queue in queueMap.iterkeys():
- queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
- self.session.exchange_unbind(exchange = exchangeName, queue = queueName)
- self.session.queue_delete(queue = queueName)
-
- # --- Simple tests ---
-
- def test_00_SimpleMaxCount(self):
- """Flow-to-disk tests based on setting max_count"""
- # Small msgs
- self.simpleLimit("test_00a", max_count = 10)
- self.simpleLimit("test_00b", max_count = 10, persistent = True)
- # Large msgs - set max_size to high number to make sure it does not interfere
- self.simpleLimit("test_00c", max_count = 10, max_size = 10000000, numMsgs = 100, msgSize = 10000)
- self.simpleLimit("test_00d", max_count = 10, max_size = 10000000, persistent = True, numMsgs = 100, msgSize = 10000)
-
- def test_01_SimpleMaxSize(self):
- """Flow-to-disk tests based on setting max_size"""
- # Small msgs
- self.simpleLimit("test_01a", max_size = 100)
- self.simpleLimit("test_01b", max_size = 100, persistent = True)
- # Large msgs - set max_size to high number to make sure it does not interfere
- self.simpleLimit("test_01c", max_size = 100000, numMsgs = 100, msgSize = 10000)
- self.simpleLimit("test_01d", max_size = 100000, persistent = True, numMsgs = 100, msgSize = 10000)
-
- def test_02_SimpleMaxCountNotAcquired(self):
- """Flow-to-disk tests based on setting max_count, but not using pre-acquired mode (ie browsing)"""
- # Small msgs
- self.simpleLimit("test_02a", max_count = 10, pre_acquired = False)
- self.simpleLimit("test_02b", max_count = 10, persistent = True, pre_acquired = False)
- # Large msgs - set max_size to high number to make sure it does not interfere
- self.simpleLimit("test_02c", max_count = 10, max_size = 10000000, pre_acquired = False, numMsgs = 100, msgSize = 10000)
- self.simpleLimit("test_02d", max_count = 10, max_size = 10000000, persistent = True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
-
- def test_03_SimpleMaxSizeNotAcquired(self):
- """Flow-to-disk tests based on setting max_size, but not using pre-acquired mode (ie browsing)"""
- # Small msgs
- self.simpleLimit("test_03a", max_size = 100, pre_acquired = False)
- self.simpleLimit("test_03b", max_size = 100, persistent = True, pre_acquired = False)
- # Large msgs - set max_size to high number to make sure it does not interfere
- self.simpleLimit("test_03c", max_size = 100, pre_acquired = False, numMsgs = 100, msgSize = 10000)
- self.simpleLimit("test_03d", max_size = 100, persistent = True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
-
- def test_04_MaxSizeMaxCount(self):
- """Flow-to-disk tests based on setting both max-count and max-size at the same time"""
- # Small msgs
- self.simpleLimit("test_04a", max_count = 10, max_size = 1000)
- self.simpleLimit("test_04b", max_count = 10, max_size = 1000, msgSize = 250)
- # Large msgs - set max_size to high number to make sure it does not interfere
- self.simpleLimit("test_04c", max_count = 10, max_size = 1000, persistent = True)
- self.simpleLimit("test_04d", max_count = 10, max_size = 1000, msgSize = 250, persistent = True)
-
- # --- Multi-queue tests ---
-
- def test_05_MultiQueueTransQueueTransMsg(self):
- """Flow-to-disk tests where both queues and messages are transient and messages are routed to more than one queue"""
- queueMap1 = {"test_05a" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
- "test_05b" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None} }
- queueMap2 = {"test_05c" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
- "test_05d" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None} }
- queueMap3 = {"test_05e" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
- "test_05f" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000} }
- queueMap4 = {"test_05g" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None},
- "test_05h" : {"persistent":False, "qpid.max_count":8, "qpid.max_size": None} }
- queueMap5 = {"test_05i" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
- "test_05j" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 800} }
- queueMap6 = {"test_05k" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
- "test_05l" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None},
- "test_05m" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
- "test_05n" : {"persistent":False, "qpid.max_count":8, "qpid.max_size": 800} }
- self.multiQueueLimit(queueMap1)
- self.multiQueueLimit(queueMap2, fail_on_msg = 10)
- self.multiQueueLimit(queueMap3, msgSize = 100, fail_on_msg = 10)
- self.multiQueueLimit(queueMap4, fail_on_msg = 8)
- self.multiQueueLimit(queueMap5, msgSize = 100, fail_on_msg = 8)
- self.multiQueueLimit(queueMap6, msgSize = 100, fail_on_msg = 8)
-
- def test_06_MultiQueueDurableQueueTransMsg(self):
- """Flow-to-disk tests where both queues are durable but messages are transient and messages are routed to more than one queue"""
- queueMap1 = {"test_06a" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
- "test_06b" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None} }
- queueMap2 = {"test_06c" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
- "test_06d" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None} }
- queueMap3 = {"test_06e" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
- "test_06f" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000} }
- queueMap4 = {"test_06g" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None},
- "test_06h" : {"persistent":True, "qpid.max_count":8, "qpid.max_size": None} }
- queueMap5 = {"test_06i" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000},
- "test_06j" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 800} }
- queueMap6 = {"test_06k" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
- "test_06l" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None},
- "test_06m" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000},
- "test_06n" : {"persistent":True, "qpid.max_count":8, "qpid.max_size": 800} }
- self.multiQueueLimit(queueMap1)
- self.multiQueueLimit(queueMap2, fail_on_msg = 10)
- self.multiQueueLimit(queueMap3, msgSize = 100, fail_on_msg = 10)
- self.multiQueueLimit(queueMap4, fail_on_msg = 8)
- self.multiQueueLimit(queueMap5, msgSize = 100, fail_on_msg = 8)
- self.multiQueueLimit(queueMap6, msgSize = 100, fail_on_msg = 8)
-
-
- def test_07_MultiQueueTransQueueDurableMsg(self):
- """Flow-to-disk tests where both queues and messages are transient and messages are routed to more than one queue"""
- queueMap1 = {"test_07a" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
- "test_07b" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None} }
- queueMap2 = {"test_07c" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
- "test_07d" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None} }
- queueMap3 = {"test_07e" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
- "test_07f" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000} }
- queueMap4 = {"test_07g" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None},
- "test_07h" : {"persistent":False, "qpid.max_count":8, "qpid.max_size": None} }
- queueMap5 = {"test_07i" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
- "test_07j" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 800} }
- queueMap6 = {"test_07k" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
- "test_07l" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None},
- "test_07m" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
- "test_07n" : {"persistent":False, "qpid.max_count":8, "qpid.max_size": 800} }
- self.multiQueueLimit(queueMap1, persistent = True)
- self.multiQueueLimit(queueMap2, persistent = True, fail_on_msg = 10)
- self.multiQueueLimit(queueMap3, msgSize = 100, persistent = True, fail_on_msg = 10)
- self.multiQueueLimit(queueMap4, persistent = True, fail_on_msg = 8)
- self.multiQueueLimit(queueMap5, msgSize = 100, persistent = True, fail_on_msg = 8)
- self.multiQueueLimit(queueMap6, msgSize = 100, persistent = True, fail_on_msg = 8)
-
- def test_08_MultiQueueDurableQueueDurableMsg(self):
- """Flow-to-disk tests where both queues are durable but messages are transient and messages are routed to more than one queue"""
- queueMap1 = {"test_08a" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
- "test_08b" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None} }
- queueMap2 = {"test_08c" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
- "test_08d" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None} }
- queueMap3 = {"test_08e" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
- "test_08f" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000} }
- queueMap4 = {"test_08g" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None},
- "test_08h" : {"persistent":True, "qpid.max_count":8, "qpid.max_size": None} }
- queueMap5 = {"test_08i" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000},
- "test_08j" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 800} }
- queueMap6 = {"test_08k" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
- "test_08l" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None},
- "test_08m" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000},
- "test_08n" : {"persistent":True, "qpid.max_count":8, "qpid.max_size": 800} }
- self.multiQueueLimit(queueMap1, persistent = True)
- self.multiQueueLimit(queueMap2, persistent = True)
- self.multiQueueLimit(queueMap3, msgSize = 100, persistent = True)
- self.multiQueueLimit(queueMap4, persistent = True)
- self.multiQueueLimit(queueMap5, msgSize = 100, persistent = True)
- self.multiQueueLimit(queueMap6, msgSize = 100, persistent = True)
-
- def test_09_MultiQueueMixedQueueTransMsg(self):
- """Flow-to-disk tests where both queues are durable but messages are transient and messages are routed to more than one queue"""
- queueMap1 = {"test_09a" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
- "test_09b" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None} }
- queueMap2 = {"test_09c" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
- "test_09d" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None} }
- queueMap3 = {"test_09e" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
- "test_09f" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None} }
- queueMap4 = {"test_09g" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
- "test_09h" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000} }
- queueMap5 = {"test_09i" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
- "test_09j" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000} }
- queueMap6 = {"test_09k" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None},
- "test_09l" : {"persistent":True, "qpid.max_count":8, "qpid.max_size": None} }
- queueMap7 = {"test_09m" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None},
- "test_09n" : {"persistent":False, "qpid.max_count":8, "qpid.max_size": None} }
- queueMap8 = {"test_09o" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
- "test_09p" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 800} }
- queueMap9 = {"test_09q" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000},
- "test_09r" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 800} }
- queueMap10 = {"test_09s" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
- "test_09t" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None},
- "test_09u" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
- "test_09v" : {"persistent":False, "qpid.max_count":8, "qpid.max_size": 800},
- "test_09w" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
- "test_09x" : {"persistent":True, "qpid.max_count":12, "qpid.max_size": None},
- "test_09y" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1100},
- "test_09z" : {"persistent":True, "qpid.max_count":7, "qpid.max_size": 900} }
- self.multiQueueLimit(queueMap1)
- self.multiQueueLimit(queueMap2, fail_on_msg = 10)
- self.multiQueueLimit(queueMap3, fail_on_msg = 10)
- self.multiQueueLimit(queueMap4, msgSize = 100, fail_on_msg = 10)
- self.multiQueueLimit(queueMap5, msgSize = 100, fail_on_msg = 10)
- self.multiQueueLimit(queueMap6, fail_on_msg = 8)
- self.multiQueueLimit(queueMap7, fail_on_msg = 8)
- self.multiQueueLimit(queueMap8, msgSize = 100, fail_on_msg = 8)
- self.multiQueueLimit(queueMap9, msgSize = 100, fail_on_msg = 8)
- self.multiQueueLimit(queueMap10, msgSize = 100, fail_on_msg = 7)
-
- def test_10_MultiQueueMixedQueueDurableMsg(self):
- """Flow-to-disk tests where both queues are durable but messages are transient and messages are routed to more than one queue"""
- queueMap1 = {"test_10a" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
- "test_10b" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None} }
- queueMap2 = {"test_10c" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
- "test_10d" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None} }
- queueMap3 = {"test_10e" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
- "test_10f" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None} }
- queueMap4 = {"test_10g" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
- "test_10h" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000} }
- queueMap5 = {"test_10i" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
- "test_10j" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000} }
- queueMap6 = {"test_10k" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None},
- "test_10l" : {"persistent":True, "qpid.max_count":8, "qpid.max_size": None} }
- queueMap7 = {"test_10m" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None},
- "test_10n" : {"persistent":False, "qpid.max_count":8, "qpid.max_size": None} }
- queueMap8 = {"test_10o" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
- "test_10p" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 800} }
- queueMap9 = {"test_10q" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000},
- "test_10r" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 800} }
- queueMap10 = {"test_10s" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
- "test_10t" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None},
- "test_10u" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
- "test_10v" : {"persistent":False, "qpid.max_count":8, "qpid.max_size": 800},
- "test_10w" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
- "test_10x" : {"persistent":True, "qpid.max_count":12, "qpid.max_size": None},
- "test_10y" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1100},
- "test_10z" : {"persistent":True, "qpid.max_count":7, "qpid.max_size": 900} }
- self.multiQueueLimit(queueMap1, persistent = True)
- self.multiQueueLimit(queueMap2, persistent = True, fail_on_msg = 10)
- self.multiQueueLimit(queueMap3, persistent = True, fail_on_msg = 10)
- self.multiQueueLimit(queueMap4, msgSize = 100, persistent = True, fail_on_msg = 10)
- self.multiQueueLimit(queueMap5, msgSize = 100, persistent = True, fail_on_msg = 10)
- self.multiQueueLimit(queueMap6, persistent = True, fail_on_msg = 8)
- self.multiQueueLimit(queueMap7, persistent = True, fail_on_msg = 8)
- self.multiQueueLimit(queueMap8, msgSize = 100, persistent = True, fail_on_msg = 8)
- self.multiQueueLimit(queueMap9, msgSize = 100, persistent = True, fail_on_msg = 8)
- self.multiQueueLimit(queueMap10, msgSize = 100, persistent = True, fail_on_msg = 7)
-
- # --- Long and randomized tests ---
-
- def test_11_Randomized(self):
- """Randomized flow-to-disk tests"""
- seed = long(1000.0 * time.time())
- print "seed=0x%x" % seed
- random.seed(seed)
- for i in range(0, 10):
- self.randomLimit(i)
+ # --- Consume messages ---
+ dtB = "tagB-%d-%d" % (tp, tc)
+ ids, txid = self._browse(qn, dtB, am, numMsgs, msgSize, tc)
+ self._checkConsume(qn, am, numMsgs, ids, txid, tc)
+ self._checkEmpty(qn)
Modified: store/trunk/cpp/tests/run_python_tests
===================================================================
--- store/trunk/cpp/tests/run_python_tests 2009-09-11 13:35:04 UTC (rev 3622)
+++ store/trunk/cpp/tests/run_python_tests 2009-09-14 15:21:33 UTC (rev 3623)
@@ -78,12 +78,10 @@
start_broker() {
${QPIDD} --daemon --port 0 --no-module-dir --load-module=${STORE_LIB} --data-dir=${STORE_DIR} --auth=no --log-enable info+ --log-to-file ${STORE_DIR}/broker.python-test.log > qpidd.port
LOCAL_PORT=`cat qpidd.port`
- echo "run_python_tests: Started qpidd on port ${LOCAL_PORT}"
}
stop_broker() {
- echo "run_python_tests: Stopping broker on port ${LOCAL_PORT}"
- ${QPIDD} -q --port ${LOCAL_PORT}
+ ${QPIDD} -q --port $LOCAL_PORT
}
fail=0
15 years, 3 months
rhmessaging commits: r3622 - in store/trunk/cpp: tests and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2009-09-11 09:35:04 -0400 (Fri, 11 Sep 2009)
New Revision: 3622
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/python_tests/flow_to_disk.py
store/trunk/cpp/tests/run_python_tests
Log:
Joint checkin with cctrieloff to correct mulit-queue flow-to-disk behavior. Syncs with qpid r.813825.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2009-09-10 16:25:04 UTC (rev 3621)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2009-09-11 13:35:04 UTC (rev 3622)
@@ -252,6 +252,8 @@
#define MAX_AIO_SLEEPS 1000 // 10 sec
#define AIO_SLEEP_TIME 10000 // 10 ms
+// Return true if content is recovered from store; false if content is external and must be recovered from an external store.
+// Throw exception for all errors.
bool
JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset)
{
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-09-10 16:25:04 UTC (rev 3621)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-09-11 13:35:04 UTC (rev 3622)
@@ -1474,15 +1474,15 @@
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
if (txn->getXid().empty()) {
if (message->isContentReleased()) {
- jc->enqueue_extern_data_record(size, dtokp.get(), false);
+ jc->enqueue_extern_data_record(size, dtokp.get(), !message->isPersistent());
} else {
- jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
+ jc->enqueue_data_record(buff, size, size, dtokp.get(), !message->isPersistent());
}
} else {
if (message->isContentReleased()) {
- jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
+ jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), !message->isPersistent());
} else {
- jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
+ jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), !message->isPersistent());
}
}
} else {
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2009-09-10 16:25:04 UTC (rev 3621)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2009-09-11 13:35:04 UTC (rev 3622)
@@ -377,7 +377,8 @@
store.stage(pmsg);
//append to it
- msg->releaseContent(&store);//ensure that data is not held in memory but is appended to disk when added
+ msg->setStore(&store);
+ msg->releaseContent(true);//ensure that data is not held in memory but is appended to disk when added
store.appendContent(cpmsg, data1);
store.appendContent(cpmsg, data2);
Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py 2009-09-10 16:25:04 UTC (rev 3621)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py 2009-09-11 13:35:04 UTC (rev 3622)
@@ -34,16 +34,16 @@
# --- Helper functions ---
- def _browse(self, qn, dt, am, numMsgs, msgSize, txnConsume):
+ def _browse(self, queueName, destinationTag, acquireMode, numMsgs, msgSize, txnConsume):
txid = None
if txnConsume:
- txid = self._makeXid("consumer-xid-%s" % qn)
+ txid = self._makeXid("consumer-xid-%s" % queueName)
self.session.dtx_select()
self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
- self.session.message_subscribe(queue=qn, destination=dt, acquire_mode=am)
- self.session.message_flow(destination=dt, unit=self.session.credit_unit.message, value=0xFFFFFFFF)
- self.session.message_flow(destination=dt, unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
- queue = self.session.incoming(dt)
+ self.session.message_subscribe(queue=queueName, destination=destinationTag, acquire_mode=acquireMode)
+ self.session.message_flow(destination=destinationTag, unit=self.session.credit_unit.message, value=0xFFFFFFFF)
+ self.session.message_flow(destination=destinationTag, unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
+ queue = self.session.incoming(destinationTag)
ids = RangedSet()
for msgNum in range(0, numMsgs):
expectedStr = self._makeMessage(msgNum, msgSize)
@@ -52,16 +52,16 @@
ids.add(msg.id)
return ids, txid
- def _checkCancel(self, qn, dt, numMsgs, ids):
+ def _checkCancel(self, queueName, destinationTag, numMsgs, ids):
self.session.message_release(ids)
- self.session.queue_declare(queue=qn)
- self.assertEqual(numMsgs, self.session.queue_query(queue=qn).message_count)
- self.session.message_cancel(destination=dt)
+ self.session.queue_declare(queue=queueName)
+ self.assertEqual(numMsgs, self.session.queue_query(queue=queueName).message_count)
+ self.session.message_cancel(destination=destinationTag)
- def _checkConsume(self, qn, am, numMsgs, ids, txid, txnConsume):
- if am == self.session.acquire_mode.not_acquired:
- self.session.queue_declare(queue=qn)
- self.assertEqual(numMsgs, self.session.queue_query(queue=qn).message_count)
+ def _checkConsume(self, queueName, acquireMode, numMsgs, ids, txid, txnConsume):
+ if acquireMode == self.session.acquire_mode.not_acquired:
+ self.session.queue_declare(queue=queueName)
+ self.assertEqual(numMsgs, self.session.queue_query(queue=queueName).message_count)
response = self.session.message_acquire(ids)
for range in ids:
for msgId in range:
@@ -73,10 +73,15 @@
self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid, one_phase=False).status)
self._resetChannel()
- def _checkEmpty(self, qn):
- self.session.queue_declare(queue=qn)
- self.assertEqual(0, self.session.queue_query(queue=qn).message_count)
+ def _checkEmpty(self, queueName):
+ self.session.queue_declare(queue=queueName)
+ self.assertEqual(0, self.session.queue_query(queue=queueName).message_count)
+ def _handleSessionClose(self):
+ time.sleep(1)
+ self.tearDown()
+ self.setUp()
+
def _makeMessage(self, msgCnt, msgSize):
msg = "Message-%04d" % msgCnt
msgLen = len(msg)
@@ -93,19 +98,45 @@
branchqual = "v%s" % self.txCounter
return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
- def _produce(self, qn, dm, numMsgs, msgSize, txnProduce):
+ def _produceDirect(self, queueName, deliveryMode, numMsgs, msgSize, txnProduce):
if txnProduce:
- txid = self._makeXid("producer-xid-%s" % qn)
+ txid = self._makeXid("producer-xid-%s" % queueName)
self.session.dtx_select()
self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
for msgNum in range(0, numMsgs):
msg_str = self._makeMessage(msgNum, msgSize)
- self.session.message_transfer(message=Message(self.session.delivery_properties(routing_key=qn, delivery_mode=dm), msg_str))
+ self.session.message_transfer(message=Message(self.session.delivery_properties(routing_key=queueName, delivery_mode=deliveryMode), msg_str))
if txnProduce:
self.assertEqual(self.XA_OK, self.session.dtx_end(xid=txid).status)
self.assertEqual(self.XA_OK, self.session.dtx_prepare(xid=txid).status)
self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid, one_phase=False).status)
self._resetChannel()
+
+ def _produceFanout(self, exchangeName, deliveryMode, numMsgs, msgSize, txnProduce, fail_on_msg):
+ if txnProduce:
+ txid = self._makeXid("producer-xid-%s" % exchangeName)
+ self.session.dtx_select()
+ self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
+ try:
+ for msgNum in range(0, numMsgs):
+ msg_str = self._makeMessage(msgNum, msgSize)
+ self.session.message_transfer(destination=exchangeName, message=Message(self.session.delivery_properties(delivery_mode=deliveryMode), msg_str))
+ if fail_on_msg != None and msgNum > fail_on_msg and not txnProduce:
+ self.fail("Failed to throw SessionException on message_transfer %s" % fail_on_msg)
+ except SessionException:
+ if fail_on_msg != None and msgNum == fail_on_msg and not txnProduce:
+ self._handleSessionClose()
+ if txnProduce:
+ self.assertEqual(self.XA_OK, self.session.dtx_end(xid=txid).status)
+ self.assertEqual(self.XA_OK, self.session.dtx_prepare(xid=txid).status)
+ try:
+ self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid, one_phase=False).status)
+ except SessionException:
+ if fail_on_msg != None and txnProduce:
+ self._handleSessionClose()
+ else:
+ self.fail("Failed to throw expected SessionException on dtx_commit")
+ self._resetChannel()
def _randomBool(self):
if random.randint(0, 1) > 0:
@@ -116,109 +147,66 @@
self.session.close()
self.session = self.conn.session("test-session", 1)
- # --- Simple tests ---
+ # --- Simple test framework (single queue) ---
- def test_FlowToDisk_00_SimpleMaxCount(self):
- """Flow-to-disk tests based on setting max_count"""
- self.simpleLimit("test_FlowToDisk_00a", max_count = 10)
- self.simpleLimit("test_FlowToDisk_00b", max_count = 10, persistent = True)
- self.simpleLimit("test_FlowToDisk_00c", max_count = 10, max_size = 10000000, numMsgs = 100, msgSize = 10000)
- self.simpleLimit("test_FlowToDisk_00d", max_count = 10, max_size = 10000000, persistent = True, numMsgs = 100, msgSize = 10000)
-
- def test_FlowToDisk_01_SimpleMaxSize(self):
- """Flow-to-disk tests based on setting max_size"""
- self.simpleLimit("test_FlowToDisk_01a", max_size = 100)
- self.simpleLimit("test_FlowToDisk_01b", max_size = 100, persistent = True)
- self.simpleLimit("test_FlowToDisk_01c", max_size = 100000, numMsgs = 100, msgSize = 10000)
- self.simpleLimit("test_FlowToDisk_01d", max_size = 100000, persistent = True, numMsgs = 100, msgSize = 10000)
-
- def test_FlowToDisk_02_SimpleMaxCountNotAcquired(self):
- """Flow-to-disk tests based on setting max_count, but not using pre-acquired mode (ie browsing)"""
- self.simpleLimit("test_FlowToDisk_02a", max_count = 10, pre_acquired = False)
- self.simpleLimit("test_FlowToDisk_02b", max_count = 10, persistent = True, pre_acquired = False)
- self.simpleLimit("test_FlowToDisk_02c", max_count = 10, max_size = 10000000, pre_acquired = False, numMsgs = 100, msgSize = 10000)
- self.simpleLimit("test_FlowToDisk_02d", max_count = 10, max_size = 10000000, persistent = True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
-
- def test_FlowToDisk_03_SimpleMaxSizeNotAcquired(self):
- """Flow-to-disk tests based on setting max_size, but not using pre-acquired mode (ie browsing)"""
- self.simpleLimit("test_FlowToDisk_03a", max_size = 100, pre_acquired = False)
- self.simpleLimit("test_FlowToDisk_03b", max_size = 100, persistent = True, pre_acquired = False)
- self.simpleLimit("test_FlowToDisk_03c", max_size = 100, pre_acquired = False, numMsgs = 100, msgSize = 10000)
- self.simpleLimit("test_FlowToDisk_03d", max_size = 100, persistent = True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
-
- def test_FlowToDisk_04_MaxSizeMaxCount(self):
- """Flow-to-disk tests based on setting both max-count and max-size at the same time"""
- self.simpleLimit("test_FlowToDisk_04a", max_count = 10, max_size = 1000)
- self.simpleLimit("test_FlowToDisk_04b", max_count = 10, max_size = 1000, msgSize = 250)
- self.simpleLimit("test_FlowToDisk_04c", max_count = 10, max_size = 1000, persistent = True)
- self.simpleLimit("test_FlowToDisk_04d", max_count = 10, max_size = 1000, msgSize = 250, persistent = True)
-
- def test_FlowToDisk_05_Randomized(self):
- """Randomized flow-to-disk tests"""
- seed = long(1000.0 * time.time())
- print "seed=0x%x" % seed
- random.seed(seed)
- for i in range(0, 10):
- self.randomLimit(i)
-
- def simpleLimit(self, qn, max_count = None, max_size = None, persistent = False, pre_acquired = True, numMsgs = 15, msgSize = None, browse = True):
- qa = {'qpid.policy_type':'flow_to_disk'}
+ def simpleLimit(self, queueName, max_count = None, max_size = None, persistent = False, pre_acquired = True, numMsgs = 15, msgSize = None, browse = True, policy = "flow_to_disk"):
+ if max_count != None or max_size != None:
+ queueArgs = {'qpid.policy_type':policy}
if max_count != None:
- qa['qpid.max_count'] = max_count
+ queueArgs['qpid.max_count'] = max_count
if max_size != None:
- qa['qpid.max_size'] = max_size
+ queueArgs['qpid.max_size'] = max_size
if persistent:
- dm = self.session.delivery_mode.persistent
+ deliveryMode = self.session.delivery_mode.persistent
else:
- dm = self.session.delivery_mode.non_persistent
+ deliveryMode = self.session.delivery_mode.non_persistent
if pre_acquired:
- am = self.session.acquire_mode.pre_acquired
+ acquireMode = self.session.acquire_mode.pre_acquired
else:
- am = self.session.acquire_mode.not_acquired
+ acquireMode = self.session.acquire_mode.not_acquired
# Cycle through the produce/consume block transaction combinations
for i in range(0, 4):
- tp = i & 1 != 0 # Transactional produce
- tc = i & 2 != 0 # Transactional consume
- self.txSimpleLimit(qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse)
+ txnProduce = i & 1 != 0 # Transactional produce
+ txnConsume = i & 2 != 0 # Transactional consume
+ self._txSimpleLimit(queueName, queueArgs, deliveryMode, acquireMode, numMsgs, msgSize, txnProduce, txnConsume, browse)
- def randomLimit(self, count):
- qa = {'qpid.policy_type':'flow_to_disk'}
- qn = "randomized_test_%04d" % count
+ def randomLimit(self, count, policy = "flow_to_disk"):
+ queueArgs = {'qpid.policy_type':policy}
+ queueName = "randomized_test_%04d" % count
# Flow to disk policy
maxCount = None
if self._randomBool():
maxCount = random.randint(0,10000)
- qa['qpid.max_count'] = maxCount
+ queueArgs['qpid.max_count'] = maxCount
maxSize = None
if self._randomBool():
maxSize = random.randint(0, 1000000)
- qa['qpid.max_size'] = maxSize
+ queueArgs['qpid.max_size'] = maxSize
# Persistence
if self._randomBool():
- dm = self.session.delivery_mode.persistent
+ deliveryMode = self.session.delivery_mode.persistent
else:
- dm = self.session.delivery_mode.non_persistent
+ deliveryMode = self.session.delivery_mode.non_persistent
# Acquired mode
if self._randomBool():
- am = self.session.acquire_mode.pre_acquired
+ acquireMode = self.session.acquire_mode.pre_acquired
browse = False
else:
- am = self.session.acquire_mode.not_acquired
+ acquireMode = self.session.acquire_mode.not_acquired
browse = self._randomBool()
numMsgs = random.randint(1, 10000)
sizeLimit = int(1000000 / numMsgs)
msgSize = random.randint(1, sizeLimit)
- tp = self._randomBool()
- tc = self._randomBool()
+ txnProduce = self._randomBool()
+ txnConsume = self._randomBool()
- #print " qn=%s, qa=%s, dm=%s, am=%s, numMsgs=%d, msgSize=%d, tp=%s, tc=%s, browse=%s" % (qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse)
- self.txSimpleLimit(qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse)
+ self._txSimpleLimit(queueName, queueArgs, deliveryMode, acquireMode, numMsgs, msgSize, txnProduce, txnConsume, browse)
- def txSimpleLimit(self, qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse):
+ def _txSimpleLimit(self, queueName, queueArgs, deliveryMode, acquireMode, numMsgs, msgSize, txnProduce, txnConsume, browse):
"""
Test a simple case of message limits which will force flow-to-disk.
* queue_args sets a limit - either max_count and/or max_size
@@ -227,19 +215,323 @@
* Check the broker has no messages left.
"""
- self.session.queue_declare(queue=qn, durable=True, arguments=qa)
+ self.session.queue_declare(queue=queueName, durable=True, arguments=queueArgs)
- # --- Add messages ---
- self._produce(qn, dm, numMsgs, msgSize, tp)
+ # Add messages
+ self._produceDirect(queueName, deliveryMode, numMsgs, msgSize, txnProduce)
- # --- Browse messages (if possible) ---
- if am == self.session.acquire_mode.not_acquired and browse:
- dtA = "tagA-%d-%d" % (tp, tc)
- ids, txid = self._browse(qn, dtA, am, numMsgs, msgSize, False)
- self._checkCancel(qn, dtA, numMsgs, ids)
+ # Browse messages (if possible)
+ if acquireMode == self.session.acquire_mode.not_acquired and browse:
+ dtA = "tagA-%d-%d" % (txnProduce, txnConsume)
+ ids, txid = self._browse(queueName, dtA, acquireMode, numMsgs, msgSize, False)
+ self._checkCancel(queueName, dtA, numMsgs, ids)
- # --- Consume messages ---
- dtB = "tagB-%d-%d" % (tp, tc)
- ids, txid = self._browse(qn, dtB, am, numMsgs, msgSize, tc)
- self._checkConsume(qn, am, numMsgs, ids, txid, tc)
- self._checkEmpty(qn)
+ # Consume messages
+ dtB = "tagB-%d-%d" % (txnProduce, txnConsume)
+ ids, txid = self._browse(queueName, dtB, acquireMode, numMsgs, msgSize, txnConsume)
+ self._checkConsume(queueName, acquireMode, numMsgs, ids, txid, txnConsume)
+ self._checkEmpty(queueName)
+
+ self.session.queue_delete(queue=queueName)
+
+ # --- Multi-queue test framework ---
+
+ def multiQueueLimit(self, queueMap, exchangeName = "amq.fanout", persistent = False, pre_acquired = True, numMsgs = 15, msgSize = None, browse = True, policy = "flow_to_disk", fail_on_msg = None):
+ if persistent:
+ deliveryMode = self.session.delivery_mode.persistent
+ else:
+ deliveryMode = self.session.delivery_mode.non_persistent
+ if pre_acquired:
+ acquireMode = self.session.acquire_mode.pre_acquired
+ else:
+ acquireMode = self.session.acquire_mode.not_acquired
+ # Cycle through the produce/consume block transaction combinations
+# TODO - re-enable full transactional testing when BZ 522499 (which crashes the broker) is fixed.
+# for i in range(0, 4):
+# txnProduce = i & 1 != 0 # Transactional produce
+# txnConsume = i & 2 != 0 # Transactional consume
+# self._txMultiQueueLimit(queueMap, exchangeName, deliveryMode, acquireMode, numMsgs, msgSize, txnProduce, txnConsume, browse, fail_on_msg)
+ for i in range(0, 2):
+ txnConsume = i & 1 != 0 # Transactional consume
+ self._txMultiQueueLimit(queueMap, exchangeName, deliveryMode, acquireMode, numMsgs, msgSize, False, txnConsume, browse, policy, fail_on_msg)
+
+ def _txMultiQueueLimit(self, queueMap, exchangeName, deliveryMode, acquireMode, numMsgs, msgSize, txnProduce, txnConsume, browse, policy, fail_on_msg):
+ """
+ Test a multi-queue case
+ queueMap = queue map where map is queue name (key) against queue args (value)
+ """
+ self._multiQueueSetup(queueMap, exchangeName, txnProduce, txnConsume, policy)
+ self._produceFanout(exchangeName, deliveryMode, numMsgs, msgSize, txnProduce, fail_on_msg)
+ if fail_on_msg == None:
+ # Browse messages (ie get without acquiring)
+ if acquireMode == self.session.acquire_mode.not_acquired and browse:
+ for queue in queueMap.iterkeys():
+ deliveryTagA = "tagA-%d-%d" % (txnProduce, txnConsume)
+ queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
+ ids, txid = self._browse(queueName, deliveryTagA, acquireMode, numMsgs, msgSize, False)
+ self._checkCancel(queueName, deliveryTagA, numMsgs, ids)
+ # Consume messages
+ for queue in queueMap.iterkeys():
+ deliveryTagB = "tagB-%d-%d" % (txnProduce, txnConsume)
+ queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
+ ids, txid = self._browse(queueName, deliveryTagB, acquireMode, numMsgs, msgSize, txnConsume)
+ self._checkConsume(queueName, acquireMode, numMsgs, ids, txid, txnConsume)
+ self._checkEmpty(queueName)
+ self._multiQueueTeardown(queueMap, exchangeName, txnProduce, txnConsume)
+
+ def _multiQueueSetup(self, queueMap, exchangeName, txnProduce, txnConsume, policy, exclusive = True):
+ for queue, qp in queueMap.iteritems():
+ queueArgs = {}
+ ftd = False
+ for p,v in qp.iteritems():
+ if p == "persistent":
+ d = v == True
+ elif p == "qpid.max_count":
+ if v != None:
+ queueArgs[p] = v
+ ftd = True
+ elif p == "qpid.max_size":
+ if v != None:
+ queueArgs[p] = v
+ ftd = True
+ if ftd:
+ queueArgs["qpid.policy_type"] = policy
+ queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
+ self.session.queue_declare(queue = queueName, durable = d, arguments = queueArgs, exclusive = exclusive)
+ self.session.exchange_bind(exchange = exchangeName, queue = queueName)
+
+ def _multiQueueTeardown(self, queueMap, exchangeName, txnProduce, txnConsume):
+ for queue in queueMap.iterkeys():
+ queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
+ self.session.exchange_unbind(exchange = exchangeName, queue = queueName)
+ self.session.queue_delete(queue = queueName)
+
+ # --- Simple tests ---
+
+ def test_00_SimpleMaxCount(self):
+ """Flow-to-disk tests based on setting max_count"""
+ # Small msgs
+ self.simpleLimit("test_00a", max_count = 10)
+ self.simpleLimit("test_00b", max_count = 10, persistent = True)
+ # Large msgs - set max_size to high number to make sure it does not interfere
+ self.simpleLimit("test_00c", max_count = 10, max_size = 10000000, numMsgs = 100, msgSize = 10000)
+ self.simpleLimit("test_00d", max_count = 10, max_size = 10000000, persistent = True, numMsgs = 100, msgSize = 10000)
+
+ def test_01_SimpleMaxSize(self):
+ """Flow-to-disk tests based on setting max_size"""
+ # Small msgs
+ self.simpleLimit("test_01a", max_size = 100)
+ self.simpleLimit("test_01b", max_size = 100, persistent = True)
+ # Large msgs - set max_size to high number to make sure it does not interfere
+ self.simpleLimit("test_01c", max_size = 100000, numMsgs = 100, msgSize = 10000)
+ self.simpleLimit("test_01d", max_size = 100000, persistent = True, numMsgs = 100, msgSize = 10000)
+
+ def test_02_SimpleMaxCountNotAcquired(self):
+ """Flow-to-disk tests based on setting max_count, but not using pre-acquired mode (ie browsing)"""
+ # Small msgs
+ self.simpleLimit("test_02a", max_count = 10, pre_acquired = False)
+ self.simpleLimit("test_02b", max_count = 10, persistent = True, pre_acquired = False)
+ # Large msgs - set max_size to high number to make sure it does not interfere
+ self.simpleLimit("test_02c", max_count = 10, max_size = 10000000, pre_acquired = False, numMsgs = 100, msgSize = 10000)
+ self.simpleLimit("test_02d", max_count = 10, max_size = 10000000, persistent = True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
+
+ def test_03_SimpleMaxSizeNotAcquired(self):
+ """Flow-to-disk tests based on setting max_size, but not using pre-acquired mode (ie browsing)"""
+ # Small msgs
+ self.simpleLimit("test_03a", max_size = 100, pre_acquired = False)
+ self.simpleLimit("test_03b", max_size = 100, persistent = True, pre_acquired = False)
+ # Large msgs - set max_size to high number to make sure it does not interfere
+ self.simpleLimit("test_03c", max_size = 100, pre_acquired = False, numMsgs = 100, msgSize = 10000)
+ self.simpleLimit("test_03d", max_size = 100, persistent = True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
+
+ def test_04_MaxSizeMaxCount(self):
+ """Flow-to-disk tests based on setting both max-count and max-size at the same time"""
+ # Small msgs
+ self.simpleLimit("test_04a", max_count = 10, max_size = 1000)
+ self.simpleLimit("test_04b", max_count = 10, max_size = 1000, msgSize = 250)
+ # Large msgs - set max_size to high number to make sure it does not interfere
+ self.simpleLimit("test_04c", max_count = 10, max_size = 1000, persistent = True)
+ self.simpleLimit("test_04d", max_count = 10, max_size = 1000, msgSize = 250, persistent = True)
+
+ # --- Multi-queue tests ---
+
+ def test_05_MultiQueueTransQueueTransMsg(self):
+ """Flow-to-disk tests where both queues and messages are transient and messages are routed to more than one queue"""
+ queueMap1 = {"test_05a" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+ "test_05b" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None} }
+ queueMap2 = {"test_05c" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+ "test_05d" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None} }
+ queueMap3 = {"test_05e" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+ "test_05f" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000} }
+ queueMap4 = {"test_05g" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None},
+ "test_05h" : {"persistent":False, "qpid.max_count":8, "qpid.max_size": None} }
+ queueMap5 = {"test_05i" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
+ "test_05j" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 800} }
+ queueMap6 = {"test_05k" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+ "test_05l" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None},
+ "test_05m" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
+ "test_05n" : {"persistent":False, "qpid.max_count":8, "qpid.max_size": 800} }
+ self.multiQueueLimit(queueMap1)
+ self.multiQueueLimit(queueMap2, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap3, msgSize = 100, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap4, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap5, msgSize = 100, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap6, msgSize = 100, fail_on_msg = 8)
+
+ def test_06_MultiQueueDurableQueueTransMsg(self):
+ """Flow-to-disk tests where both queues are durable but messages are transient and messages are routed to more than one queue"""
+ queueMap1 = {"test_06a" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
+ "test_06b" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None} }
+ queueMap2 = {"test_06c" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
+ "test_06d" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None} }
+ queueMap3 = {"test_06e" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
+ "test_06f" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000} }
+ queueMap4 = {"test_06g" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None},
+ "test_06h" : {"persistent":True, "qpid.max_count":8, "qpid.max_size": None} }
+ queueMap5 = {"test_06i" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000},
+ "test_06j" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 800} }
+ queueMap6 = {"test_06k" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
+ "test_06l" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None},
+ "test_06m" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000},
+ "test_06n" : {"persistent":True, "qpid.max_count":8, "qpid.max_size": 800} }
+ self.multiQueueLimit(queueMap1)
+ self.multiQueueLimit(queueMap2, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap3, msgSize = 100, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap4, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap5, msgSize = 100, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap6, msgSize = 100, fail_on_msg = 8)
+
+
+ def test_07_MultiQueueTransQueueDurableMsg(self):
+ """Flow-to-disk tests where both queues and messages are transient and messages are routed to more than one queue"""
+ queueMap1 = {"test_07a" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+ "test_07b" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None} }
+ queueMap2 = {"test_07c" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+ "test_07d" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None} }
+ queueMap3 = {"test_07e" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+ "test_07f" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000} }
+ queueMap4 = {"test_07g" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None},
+ "test_07h" : {"persistent":False, "qpid.max_count":8, "qpid.max_size": None} }
+ queueMap5 = {"test_07i" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
+ "test_07j" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 800} }
+ queueMap6 = {"test_07k" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+ "test_07l" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None},
+ "test_07m" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
+ "test_07n" : {"persistent":False, "qpid.max_count":8, "qpid.max_size": 800} }
+ self.multiQueueLimit(queueMap1, persistent = True)
+ self.multiQueueLimit(queueMap2, persistent = True, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap3, msgSize = 100, persistent = True, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap4, persistent = True, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap5, msgSize = 100, persistent = True, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap6, msgSize = 100, persistent = True, fail_on_msg = 8)
+
+ def test_08_MultiQueueDurableQueueDurableMsg(self):
+ """Flow-to-disk tests where both queues are durable but messages are transient and messages are routed to more than one queue"""
+ queueMap1 = {"test_08a" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
+ "test_08b" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None} }
+ queueMap2 = {"test_08c" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
+ "test_08d" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None} }
+ queueMap3 = {"test_08e" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
+ "test_08f" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000} }
+ queueMap4 = {"test_08g" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None},
+ "test_08h" : {"persistent":True, "qpid.max_count":8, "qpid.max_size": None} }
+ queueMap5 = {"test_08i" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000},
+ "test_08j" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 800} }
+ queueMap6 = {"test_08k" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
+ "test_08l" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None},
+ "test_08m" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000},
+ "test_08n" : {"persistent":True, "qpid.max_count":8, "qpid.max_size": 800} }
+ self.multiQueueLimit(queueMap1, persistent = True)
+ self.multiQueueLimit(queueMap2, persistent = True)
+ self.multiQueueLimit(queueMap3, msgSize = 100, persistent = True)
+ self.multiQueueLimit(queueMap4, persistent = True)
+ self.multiQueueLimit(queueMap5, msgSize = 100, persistent = True)
+ self.multiQueueLimit(queueMap6, msgSize = 100, persistent = True)
+
+ def test_09_MultiQueueMixedQueueTransMsg(self):
+ """Flow-to-disk tests where both queues are durable but messages are transient and messages are routed to more than one queue"""
+ queueMap1 = {"test_09a" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+ "test_09b" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None} }
+ queueMap2 = {"test_09c" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+ "test_09d" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None} }
+ queueMap3 = {"test_09e" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
+ "test_09f" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None} }
+ queueMap4 = {"test_09g" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+ "test_09h" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000} }
+ queueMap5 = {"test_09i" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
+ "test_09j" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000} }
+ queueMap6 = {"test_09k" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None},
+ "test_09l" : {"persistent":True, "qpid.max_count":8, "qpid.max_size": None} }
+ queueMap7 = {"test_09m" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None},
+ "test_09n" : {"persistent":False, "qpid.max_count":8, "qpid.max_size": None} }
+ queueMap8 = {"test_09o" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
+ "test_09p" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 800} }
+ queueMap9 = {"test_09q" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000},
+ "test_09r" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 800} }
+ queueMap10 = {"test_09s" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+ "test_09t" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None},
+ "test_09u" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
+ "test_09v" : {"persistent":False, "qpid.max_count":8, "qpid.max_size": 800},
+ "test_09w" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
+ "test_09x" : {"persistent":True, "qpid.max_count":12, "qpid.max_size": None},
+ "test_09y" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1100},
+ "test_09z" : {"persistent":True, "qpid.max_count":7, "qpid.max_size": 900} }
+ self.multiQueueLimit(queueMap1)
+ self.multiQueueLimit(queueMap2, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap3, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap4, msgSize = 100, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap5, msgSize = 100, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap6, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap7, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap8, msgSize = 100, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap9, msgSize = 100, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap10, msgSize = 100, fail_on_msg = 7)
+
+ def test_10_MultiQueueMixedQueueDurableMsg(self):
+ """Flow-to-disk tests where both queues are durable but messages are transient and messages are routed to more than one queue"""
+ queueMap1 = {"test_10a" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+ "test_10b" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None} }
+ queueMap2 = {"test_10c" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+ "test_10d" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None} }
+ queueMap3 = {"test_10e" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
+ "test_10f" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None} }
+ queueMap4 = {"test_10g" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+ "test_10h" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000} }
+ queueMap5 = {"test_10i" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
+ "test_10j" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000} }
+ queueMap6 = {"test_10k" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None},
+ "test_10l" : {"persistent":True, "qpid.max_count":8, "qpid.max_size": None} }
+ queueMap7 = {"test_10m" : {"persistent":True, "qpid.max_count":10, "qpid.max_size": None},
+ "test_10n" : {"persistent":False, "qpid.max_count":8, "qpid.max_size": None} }
+ queueMap8 = {"test_10o" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
+ "test_10p" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 800} }
+ queueMap9 = {"test_10q" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1000},
+ "test_10r" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 800} }
+ queueMap10 = {"test_10s" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+ "test_10t" : {"persistent":False, "qpid.max_count":10, "qpid.max_size": None},
+ "test_10u" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
+ "test_10v" : {"persistent":False, "qpid.max_count":8, "qpid.max_size": 800},
+ "test_10w" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": None},
+ "test_10x" : {"persistent":True, "qpid.max_count":12, "qpid.max_size": None},
+ "test_10y" : {"persistent":True, "qpid.max_count":None, "qpid.max_size": 1100},
+ "test_10z" : {"persistent":True, "qpid.max_count":7, "qpid.max_size": 900} }
+ self.multiQueueLimit(queueMap1, persistent = True)
+ self.multiQueueLimit(queueMap2, persistent = True, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap3, persistent = True, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap4, msgSize = 100, persistent = True, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap5, msgSize = 100, persistent = True, fail_on_msg = 10)
+ self.multiQueueLimit(queueMap6, persistent = True, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap7, persistent = True, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap8, msgSize = 100, persistent = True, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap9, msgSize = 100, persistent = True, fail_on_msg = 8)
+ self.multiQueueLimit(queueMap10, msgSize = 100, persistent = True, fail_on_msg = 7)
+
+ # --- Long and randomized tests ---
+
+ def test_11_Randomized(self):
+ """Randomized flow-to-disk tests"""
+ seed = long(1000.0 * time.time())
+ print "seed=0x%x" % seed
+ random.seed(seed)
+ for i in range(0, 10):
+ self.randomLimit(i)
Modified: store/trunk/cpp/tests/run_python_tests
===================================================================
--- store/trunk/cpp/tests/run_python_tests 2009-09-10 16:25:04 UTC (rev 3621)
+++ store/trunk/cpp/tests/run_python_tests 2009-09-11 13:35:04 UTC (rev 3622)
@@ -78,10 +78,12 @@
start_broker() {
${QPIDD} --daemon --port 0 --no-module-dir --load-module=${STORE_LIB} --data-dir=${STORE_DIR} --auth=no --log-enable info+ --log-to-file ${STORE_DIR}/broker.python-test.log > qpidd.port
LOCAL_PORT=`cat qpidd.port`
+ echo "run_python_tests: Started qpidd on port ${LOCAL_PORT}"
}
stop_broker() {
- ${QPIDD} -q --port $LOCAL_PORT
+ echo "run_python_tests: Stopping broker on port ${LOCAL_PORT}"
+ ${QPIDD} -q --port ${LOCAL_PORT}
}
fail=0
15 years, 3 months
rhmessaging commits: r3621 - store/trunk/java/bdbstore/etc.
by rhmessaging-commits@lists.jboss.org
Author: ritchiem
Date: 2009-09-10 12:25:04 -0400 (Thu, 10 Sep 2009)
New Revision: 3621
Modified:
store/trunk/java/bdbstore/etc/config-systests-bdb.xml
Log:
Update config-systests-bdb.xml to allow test customisations.
Modified: store/trunk/java/bdbstore/etc/config-systests-bdb.xml
===================================================================
--- store/trunk/java/bdbstore/etc/config-systests-bdb.xml 2009-09-10 14:30:06 UTC (rev 3620)
+++ store/trunk/java/bdbstore/etc/config-systests-bdb.xml 2009-09-10 16:25:04 UTC (rev 3621)
@@ -22,6 +22,7 @@
<configuration>
<system/>
<override>
+ <xml fileName="${test.config}" config-optional="true"/>
<xml fileName="${QPID_HOME}/etc/config-systests-bdb-settings.xml"/>
<xml fileName="${QPID_HOME}/etc/config-systests-settings.xml"/>
<xml fileName="${QPID_HOME}/etc/config.xml"/>
15 years, 3 months
rhmessaging commits: r3620 - store/trunk/java/bdbstore/etc.
by rhmessaging-commits@lists.jboss.org
Author: ritchiem
Date: 2009-09-10 10:30:06 -0400 (Thu, 10 Sep 2009)
New Revision: 3620
Added:
store/trunk/java/bdbstore/etc/config-systests-bdb-settings.xml
store/trunk/java/bdbstore/etc/config-systests-bdb.xml
Log:
Added new hierarchy based configuration
Added: store/trunk/java/bdbstore/etc/config-systests-bdb-settings.xml
===================================================================
--- store/trunk/java/bdbstore/etc/config-systests-bdb-settings.xml (rev 0)
+++ store/trunk/java/bdbstore/etc/config-systests-bdb-settings.xml 2009-09-10 14:30:06 UTC (rev 3620)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<broker>
+ <virtualhosts>
+ <directory>${conf}/virtualhosts</directory>
+
+ <virtualhost>
+ <name>localhost</name>
+ <localhost>
+ <store>
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+ <environment-path>${work}/bdb-store/</environment-path>
+ </store>
+ </localhost>
+ </virtualhost>
+
+ <virtualhost>
+ <name>development</name>
+ <development>
+ <store>
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+ <environment-path>${work}/bdb-store/</environment-path>
+ </store>
+ </development>
+ </virtualhost>
+
+ <virtualhost>
+ <name>test</name>
+ <test>
+ <store>
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+ <environment-path>${work}/bdb-store/</environment-path>
+ </store>
+ </test>
+ </virtualhost>
+
+ </virtualhosts>
+</broker>
+
+
Added: store/trunk/java/bdbstore/etc/config-systests-bdb.xml
===================================================================
--- store/trunk/java/bdbstore/etc/config-systests-bdb.xml (rev 0)
+++ store/trunk/java/bdbstore/etc/config-systests-bdb.xml 2009-09-10 14:30:06 UTC (rev 3620)
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<configuration>
+ <system/>
+ <override>
+ <xml fileName="${QPID_HOME}/etc/config-systests-bdb-settings.xml"/>
+ <xml fileName="${QPID_HOME}/etc/config-systests-settings.xml"/>
+ <xml fileName="${QPID_HOME}/etc/config.xml"/>
+ </override>
+</configuration>
15 years, 3 months
rhmessaging commits: r3619 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-09-08 17:09:48 -0400 (Tue, 08 Sep 2009)
New Revision: 3619
Modified:
mgmt/trunk/cumin/python/cumin/stat.strings
Log:
Fix flash chart margins
Modified: mgmt/trunk/cumin/python/cumin/stat.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/stat.strings 2009-09-08 20:31:03 UTC (rev 3618)
+++ mgmt/trunk/cumin/python/cumin/stat.strings 2009-09-08 21:09:48 UTC (rev 3619)
@@ -69,7 +69,8 @@
margin: 0;
}
-div.StatValueChart img {
+div.StatValueChart img,
+div.StatValueChart object {
margin: 0 0 0 1em;
}
div.StatValueChart img.Loading {
15 years, 3 months
rhmessaging commits: r3618 - in mgmt/trunk/wooly/python/wooly: wsgiserver and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-09-08 16:31:03 -0400 (Tue, 08 Sep 2009)
New Revision: 3618
Added:
mgmt/trunk/wooly/python/wooly/wsgiserver/
mgmt/trunk/wooly/python/wooly/wsgiserver/__init__.py
mgmt/trunk/wooly/python/wooly/wsgiserver/ssl_builtin.py
mgmt/trunk/wooly/python/wooly/wsgiserver/ssl_pyopenssl.py
Removed:
mgmt/trunk/wooly/python/wooly/wsgiserver.py
Modified:
mgmt/trunk/wooly/python/wooly/server.py
Log:
Update to rev 2525 of wsgiserver from cherrypy
Modified: mgmt/trunk/wooly/python/wooly/server.py
===================================================================
--- mgmt/trunk/wooly/python/wooly/server.py 2009-09-03 17:40:08 UTC (rev 3617)
+++ mgmt/trunk/wooly/python/wooly/server.py 2009-09-08 20:31:03 UTC (rev 3618)
@@ -16,9 +16,9 @@
self.addr = addr
self.port = port
- fqaddr = (self.addr, self.port)
- apps = [("", self.service_request)]
- self.__server = CherryPyWSGIServer(fqaddr, apps)
+ self.__server = CherryPyWSGIServer \
+ ((self.addr, self.port), self.service_request)
+ self.__server.environ["wsgi.version"] = (1, 1)
def set_ssl_cert_path(self, path):
self.__server.ssl_certificate = path
Property changes on: mgmt/trunk/wooly/python/wooly/wsgiserver
___________________________________________________________________
Name: svn:ignore
+ *.pyc
Added: mgmt/trunk/wooly/python/wooly/wsgiserver/__init__.py
===================================================================
--- mgmt/trunk/wooly/python/wooly/wsgiserver/__init__.py (rev 0)
+++ mgmt/trunk/wooly/python/wooly/wsgiserver/__init__.py 2009-09-08 20:31:03 UTC (rev 3618)
@@ -0,0 +1,1841 @@
+"""A high-speed, production ready, thread pooled, generic WSGI server.
+
+Simplest example on how to use this module directly
+(without using CherryPy's application machinery):
+
+ from cherrypy import wsgiserver
+
+ def my_crazy_app(environ, start_response):
+ status = '200 OK'
+ response_headers = [('Content-type','text/plain')]
+ start_response(status, response_headers)
+ return ['Hello world!\n']
+
+ server = wsgiserver.CherryPyWSGIServer(
+ ('0.0.0.0', 8070), my_crazy_app,
+ server_name='www.cherrypy.example')
+
+The CherryPy WSGI server can serve as many WSGI applications
+as you want in one instance by using a WSGIPathInfoDispatcher:
+
+ d = WSGIPathInfoDispatcher({'/': my_crazy_app, '/blog': my_blog_app})
+ server = wsgiserver.CherryPyWSGIServer(('0.0.0.0', 80), d)
+
+Want SSL support? Just set server.ssl_adapter to an SSLAdapter instance.
+
+This won't call the CherryPy engine (application side) at all, only the
+WSGI server, which is independent from the rest of CherryPy. Don't
+let the name "CherryPyWSGIServer" throw you; the name merely reflects
+its origin, not its coupling.
+
+For those of you wanting to understand internals of this module, here's the
+basic call flow. The server's listening thread runs a very tight loop,
+sticking incoming connections onto a Queue:
+
+ server = CherryPyWSGIServer(...)
+ server.start()
+ while True:
+ tick()
+ # This blocks until a request comes in:
+ child = socket.accept()
+ conn = HTTPConnection(child, ...)
+ server.requests.put(conn)
+
+Worker threads are kept in a pool and poll the Queue, popping off and then
+handling each connection in turn. Each connection can consist of an arbitrary
+number of requests and their responses, so we run a nested loop:
+
+ while True:
+ conn = server.requests.get()
+ conn.communicate()
+ -> while True:
+ req = HTTPRequest(...)
+ req.parse_request()
+ -> # Read the Request-Line, e.g. "GET /page HTTP/1.1"
+ req.rfile.readline()
+ req.read_headers()
+ req.respond()
+ -> response = wsgi_app(...)
+ try:
+ for chunk in response:
+ if chunk:
+ req.write(chunk)
+ finally:
+ if hasattr(response, "close"):
+ response.close()
+ if req.close_connection:
+ return
+"""
+
+CRLF = '\r\n'
+import os
+import Queue
+import re
+quoted_slash = re.compile("(?i)%2F")
+import rfc822
+import socket
+try:
+ import cStringIO as StringIO
+except ImportError:
+ import StringIO
+
+_fileobject_uses_str_type = isinstance(socket._fileobject(None)._rbuf, basestring)
+
+import sys
+import threading
+import time
+import traceback
+from urllib import unquote
+from urlparse import urlparse
+import warnings
+
+import errno
+
+def plat_specific_errors(*errnames):
+ """Return error numbers for all errors in errnames on this platform.
+
+ The 'errno' module contains different global constants depending on
+ the specific platform (OS). This function will return the list of
+ numeric values for a given list of potential names.
+ """
+ errno_names = dir(errno)
+ nums = [getattr(errno, k) for k in errnames if k in errno_names]
+ # de-dupe the list
+ return dict.fromkeys(nums).keys()
+
+socket_error_eintr = plat_specific_errors("EINTR", "WSAEINTR")
+
+socket_errors_to_ignore = plat_specific_errors(
+ "EPIPE",
+ "EBADF", "WSAEBADF",
+ "ENOTSOCK", "WSAENOTSOCK",
+ "ETIMEDOUT", "WSAETIMEDOUT",
+ "ECONNREFUSED", "WSAECONNREFUSED",
+ "ECONNRESET", "WSAECONNRESET",
+ "ECONNABORTED", "WSAECONNABORTED",
+ "ENETRESET", "WSAENETRESET",
+ "EHOSTDOWN", "EHOSTUNREACH",
+ )
+socket_errors_to_ignore.append("timed out")
+socket_errors_to_ignore.append("The read operation timed out")
+
+socket_errors_nonblocking = plat_specific_errors(
+ 'EAGAIN', 'EWOULDBLOCK', 'WSAEWOULDBLOCK')
+
+comma_separated_headers = ['ACCEPT', 'ACCEPT-CHARSET', 'ACCEPT-ENCODING',
+ 'ACCEPT-LANGUAGE', 'ACCEPT-RANGES', 'ALLOW', 'CACHE-CONTROL',
+ 'CONNECTION', 'CONTENT-ENCODING', 'CONTENT-LANGUAGE', 'EXPECT',
+ 'IF-MATCH', 'IF-NONE-MATCH', 'PRAGMA', 'PROXY-AUTHENTICATE', 'TE',
+ 'TRAILER', 'TRANSFER-ENCODING', 'UPGRADE', 'VARY', 'VIA', 'WARNING',
+ 'WWW-AUTHENTICATE']
+
+
+class WSGIPathInfoDispatcher(object):
+ """A WSGI dispatcher for dispatch based on the PATH_INFO.
+
+ apps: a dict or list of (path_prefix, app) pairs.
+ """
+
+ def __init__(self, apps):
+ try:
+ apps = apps.items()
+ except AttributeError:
+ pass
+
+ # Sort the apps by len(path), descending
+ apps.sort(cmp=lambda x,y: cmp(len(x[0]), len(y[0])))
+ apps.reverse()
+
+ # The path_prefix strings must start, but not end, with a slash.
+ # Use "" instead of "/".
+ self.apps = [(p.rstrip("/"), a) for p, a in apps]
+
+ def __call__(self, environ, start_response):
+ path = environ["PATH_INFO"] or "/"
+ for p, app in self.apps:
+ # The apps list should be sorted by length, descending.
+ if path.startswith(p + "/") or path == p:
+ environ = environ.copy()
+ environ["SCRIPT_NAME"] = environ["SCRIPT_NAME"] + p
+ environ["PATH_INFO"] = path[len(p):]
+ return app(environ, start_response)
+
+ start_response('404 Not Found', [('Content-Type', 'text/plain'),
+ ('Content-Length', '0')])
+ return ['']
+
+
+class MaxSizeExceeded(Exception):
+ pass
+
+class SizeCheckWrapper(object):
+ """Wraps a file-like object, raising MaxSizeExceeded if too large."""
+
+ def __init__(self, rfile, maxlen):
+ self.rfile = rfile
+ self.maxlen = maxlen
+ self.bytes_read = 0
+
+ def _check_length(self):
+ if self.maxlen and self.bytes_read > self.maxlen:
+ raise MaxSizeExceeded()
+
+ def read(self, size=None):
+ data = self.rfile.read(size)
+ self.bytes_read += len(data)
+ self._check_length()
+ return data
+
+ def readline(self, size=None):
+ if size is not None:
+ data = self.rfile.readline(size)
+ self.bytes_read += len(data)
+ self._check_length()
+ return data
+
+ # User didn't specify a size ...
+ # We read the line in chunks to make sure it's not a 100MB line !
+ res = []
+ while True:
+ data = self.rfile.readline(256)
+ self.bytes_read += len(data)
+ self._check_length()
+ res.append(data)
+ # See http://www.cherrypy.org/ticket/421
+ if len(data) < 256 or data[-1:] == "\n":
+ return ''.join(res)
+
+ def readlines(self, sizehint=0):
+ # Shamelessly stolen from StringIO
+ total = 0
+ lines = []
+ line = self.readline()
+ while line:
+ lines.append(line)
+ total += len(line)
+ if 0 < sizehint <= total:
+ break
+ line = self.readline()
+ return lines
+
+ def close(self):
+ self.rfile.close()
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ data = self.rfile.next()
+ self.bytes_read += len(data)
+ self._check_length()
+ return data
+
+
+class KnownLengthRFile(object):
+ """Wraps a file-like object, returning an empty string when exhausted."""
+
+ def __init__(self, rfile, content_length):
+ self.rfile = rfile
+ self.remaining = content_length
+
+ def read(self, size=None):
+ if self.remaining == 0:
+ return ''
+ if size is None:
+ size = self.remaining
+ else:
+ size = min(size, self.remaining)
+
+ data = self.rfile.read(size)
+ self.remaining -= len(data)
+ return data
+
+ def readline(self, size=None):
+ if self.remaining == 0:
+ return ''
+ if size is None:
+ size = self.remaining
+ else:
+ size = min(size, self.remaining)
+
+ data = self.rfile.readline(size)
+ self.remaining -= len(data)
+ return data
+
+ def readlines(self, sizehint=0):
+ # Shamelessly stolen from StringIO
+ total = 0
+ lines = []
+ line = self.readline(sizehint)
+ while line:
+ lines.append(line)
+ total += len(line)
+ if 0 < sizehint <= total:
+ break
+ line = self.readline(sizehint)
+ return lines
+
+ def close(self):
+ self.rfile.close()
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ data = next(self.rfile)
+ self.remaining -= len(data)
+ return data
+
+
+class HTTPRequest(object):
+ """An HTTP Request (and response).
+
+ A single HTTP connection may consist of multiple request/response pairs.
+
+ send: the 'send' method from the connection's socket object.
+ wsgi_app: the WSGI application to call.
+ environ: a partial WSGI environ (server and connection entries).
+ Because this server supports both WSGI 1.0 and 1.1, this attribute is
+ neither; instead, it has unicode keys and byte string values. It is
+ converted to the appropriate WSGI version when the WSGI app is called.
+
+ The caller MUST set the following entries (because this class doesn't):
+ * All wsgi.* entries except .input and .url_encoding
+ * SERVER_NAME and SERVER_PORT
+ * Any SSL_* entries
+ * Any custom entries like REMOTE_ADDR and REMOTE_PORT
+ * SERVER_SOFTWARE: the value to write in the "Server" response header.
+ * ACTUAL_SERVER_PROTOCOL: the value to write in the Status-Line of
+ the response. From RFC 2145: "An HTTP server SHOULD send a
+ response version equal to the highest version for which the
+ server is at least conditionally compliant, and whose major
+ version is less than or equal to the one received in the
+ request. An HTTP server MUST NOT send a version for which
+ it is not at least conditionally compliant."
+
+ outheaders: a list of header tuples to write in the response.
+ ready: when True, the request has been parsed and is ready to begin
+ generating the response. When False, signals the calling Connection
+ that the response should not be generated and the connection should
+ close.
+ close_connection: signals the calling Connection that the request
+ should close. This does not imply an error! The client and/or
+ server may each request that the connection be closed.
+ chunked_write: if True, output will be encoded with the "chunked"
+ transfer-coding. This value is set automatically inside
+ send_headers.
+ """
+
+ max_request_header_size = 0
+ max_request_body_size = 0
+
+ def __init__(self, rfile, wfile, environ, wsgi_app):
+ self._rfile = rfile
+ self.rfile = rfile
+ self.wfile = wfile
+ self.environ = environ.copy()
+ self.wsgi_app = wsgi_app
+
+ self.ready = False
+ self.started_request = False
+ self.started_response = False
+ self.status = ""
+ self.outheaders = []
+ self.sent_headers = False
+ self.close_connection = False
+ self.chunked_write = False
+
+ def parse_request(self):
+ """Parse the next HTTP request start-line and message-headers."""
+ self.rfile = SizeCheckWrapper(self._rfile, self.max_request_header_size)
+ try:
+ self._parse_request()
+ except MaxSizeExceeded:
+ self.simple_response("413 Request Entity Too Large")
+ return
+
+ def _parse_request(self):
+ # HTTP/1.1 connections are persistent by default. If a client
+ # requests a page, then idles (leaves the connection open),
+ # then rfile.readline() will raise socket.error("timed out").
+ # Note that it does this based on the value given to settimeout(),
+ # and doesn't need the client to request or acknowledge the close
+ # (although your TCP stack might suffer for it: cf Apache's history
+ # with FIN_WAIT_2).
+ request_line = self.rfile.readline()
+
+ # Set started_request to True so communicate() knows to send 408
+ # from here on out.
+ self.started_request = True
+ if not request_line:
+ # Force self.ready = False so the connection will close.
+ self.ready = False
+ return
+
+ if request_line == CRLF:
+ # RFC 2616 sec 4.1: "...if the server is reading the protocol
+ # stream at the beginning of a message and receives a CRLF
+ # first, it should ignore the CRLF."
+ # But only ignore one leading line! else we enable a DoS.
+ request_line = self.rfile.readline()
+ if not request_line:
+ self.ready = False
+ return
+
+ if not request_line.endswith(CRLF):
+ self.simple_response(400, "HTTP requires CRLF terminators")
+ return
+
+ environ = self.environ
+
+ try:
+ method, uri, req_protocol = request_line.strip().split(" ", 2)
+ except ValueError:
+ self.simple_response(400, "Malformed Request-Line")
+ return
+
+ environ["REQUEST_URI"] = uri
+ environ["REQUEST_METHOD"] = method
+
+ # uri may be an abs_path (including "http://host.domain.tld");
+ scheme, authority, path = self.parse_request_uri(uri)
+ if '#' in path:
+ self.simple_response("400 Bad Request",
+ "Illegal #fragment in Request-URI.")
+ return
+
+ if scheme:
+ environ["wsgi.url_scheme"] = scheme
+
+ environ["SCRIPT_NAME"] = ""
+
+ qs = ''
+ if '?' in path:
+ path, qs = path.split('?', 1)
+
+ # Unquote the path+params (e.g. "/this%20path" -> "this path").
+ # http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5.1.2
+ #
+ # But note that "...a URI must be separated into its components
+ # before the escaped characters within those components can be
+ # safely decoded." http://www.ietf.org/rfc/rfc2396.txt, sec 2.4.2
+ try:
+ atoms = [unquote(x) for x in quoted_slash.split(path)]
+ except ValueError, ex:
+ self.simple_response("400 Bad Request", ex.args[0])
+ return
+ path = "%2F".join(atoms)
+ environ["PATH_INFO"] = path
+
+ # Note that, like wsgiref and most other WSGI servers,
+ # we "% HEX HEX"-unquote the path but not the query string.
+ environ["QUERY_STRING"] = qs
+
+ # Compare request and server HTTP protocol versions, in case our
+ # server does not support the requested protocol. Limit our output
+ # to min(req, server). We want the following output:
+ # request server actual written supported response
+ # protocol protocol response protocol feature set
+ # a 1.0 1.0 1.0 1.0
+ # b 1.0 1.1 1.1 1.0
+ # c 1.1 1.0 1.0 1.0
+ # d 1.1 1.1 1.1 1.1
+ # Notice that, in (b), the response will be "HTTP/1.1" even though
+ # the client only understands 1.0. RFC 2616 10.5.6 says we should
+ # only return 505 if the _major_ version is different.
+ rp = int(req_protocol[5]), int(req_protocol[7])
+ server_protocol = environ["ACTUAL_SERVER_PROTOCOL"]
+ sp = int(server_protocol[5]), int(server_protocol[7])
+
+ if sp[0] != rp[0]:
+ self.simple_response("505 HTTP Version Not Supported")
+ return
+ # Bah. "SERVER_PROTOCOL" is actually the REQUEST protocol.
+ environ["SERVER_PROTOCOL"] = req_protocol
+ self.response_protocol = "HTTP/%s.%s" % min(rp, sp)
+
+ # then all the http headers
+ try:
+ self.read_headers()
+ except ValueError, ex:
+ self.simple_response("400 Bad Request", ex.args[0])
+ return
+
+ mrbs = self.max_request_body_size
+ if mrbs and int(environ.get("CONTENT_LENGTH", 0)) > mrbs:
+ self.simple_response("413 Request Entity Too Large")
+ return
+
+ # Persistent connection support
+ if self.response_protocol == "HTTP/1.1":
+ # Both server and client are HTTP/1.1
+ if environ.get("HTTP_CONNECTION", "") == "close":
+ self.close_connection = True
+ else:
+ # Either the server or client (or both) are HTTP/1.0
+ if environ.get("HTTP_CONNECTION", "") != "Keep-Alive":
+ self.close_connection = True
+
+ # Transfer-Encoding support
+ te = None
+ if self.response_protocol == "HTTP/1.1":
+ te = environ.get("HTTP_TRANSFER_ENCODING")
+ if te:
+ te = [x.strip().lower() for x in te.split(",") if x.strip()]
+
+ self.chunked_read = False
+
+ if te:
+ for enc in te:
+ if enc == "chunked":
+ self.chunked_read = True
+ else:
+ # Note that, even if we see "chunked", we must reject
+ # if there is an extension we don't recognize.
+ self.simple_response("501 Unimplemented")
+ self.close_connection = True
+ return
+
+ # From PEP 333:
+ # "Servers and gateways that implement HTTP 1.1 must provide
+ # transparent support for HTTP 1.1's "expect/continue" mechanism.
+ # This may be done in any of several ways:
+ # 1. Respond to requests containing an Expect: 100-continue request
+ # with an immediate "100 Continue" response, and proceed normally.
+ # 2. Proceed with the request normally, but provide the application
+ # with a wsgi.input stream that will send the "100 Continue"
+ # response if/when the application first attempts to read from
+ # the input stream. The read request must then remain blocked
+ # until the client responds.
+ # 3. Wait until the client decides that the server does not support
+ # expect/continue, and sends the request body on its own.
+ # (This is suboptimal, and is not recommended.)
+ #
+ # We used to do 3, but are now doing 1. Maybe we'll do 2 someday,
+ # but it seems like it would be a big slowdown for such a rare case.
+ if environ.get("HTTP_EXPECT", "") == "100-continue":
+ # Don't use simple_response here, because it emits headers
+ # we don't want. See http://www.cherrypy.org/ticket/951
+ msg = self.environ['ACTUAL_SERVER_PROTOCOL'] + " 100 Continue\r\n\r\n"
+ try:
+ self.wfile.sendall(msg)
+ except socket.error, x:
+ if x.args[0] not in socket_errors_to_ignore:
+ raise
+
+ self.ready = True
+
+ def parse_request_uri(self, uri):
+ """Parse a Request-URI into (scheme, authority, path).
+
+ Note that Request-URI's must be one of:
+
+ Request-URI = "*" | absoluteURI | abs_path | authority
+
+ Therefore, a Request-URI which starts with a double forward-slash
+ cannot be a "net_path":
+
+ net_path = "//" authority [ abs_path ]
+
+ Instead, it must be interpreted as an "abs_path" with an empty first
+ path segment:
+
+ abs_path = "/" path_segments
+ path_segments = segment *( "/" segment )
+ segment = *pchar *( ";" param )
+ param = *pchar
+ """
+ if uri == "*":
+ return None, None, uri
+
+ i = uri.find('://')
+ if i > 0:
+ # An absoluteURI.
+ # If there's a scheme (and it must be http or https), then:
+ # http_URL = "http:" "//" host [ ":" port ] [ abs_path [ "?" query ]]
+ scheme, remainder = uri[:i].lower(), uri[i + 3:]
+ authority, path = remainder.split("/", 1)
+ return scheme, authority, path
+
+ if uri.startswith('/'):
+ # An abs_path.
+ return None, None, uri
+ else:
+ # An authority.
+ return None, uri, None
+
+
+ def read_headers(self):
+ """Read header lines from the incoming stream."""
+ environ = self.environ
+
+ while True:
+ line = self.rfile.readline()
+ if not line:
+ # No more data--illegal end of headers
+ raise ValueError("Illegal end of headers.")
+
+ if line == CRLF:
+ # Normal end of headers
+ break
+ if not line.endswith(CRLF):
+ raise ValueError("HTTP requires CRLF terminators")
+
+ if line[0] in ' \t':
+ # It's a continuation line.
+ v = line.strip()
+ else:
+ try:
+ k, v = line.split(":", 1)
+ except ValueError:
+ raise ValueError("Illegal header line.")
+ k = k.strip().decode('ISO-8859-1').upper()
+ v = v.strip()
+ envname = "HTTP_" + k.replace("-", "_")
+
+ if k in comma_separated_headers:
+ existing = environ.get(envname)
+ if existing:
+ v = ", ".join((existing, v))
+ environ[envname] = v
+
+ ct = environ.pop("HTTP_CONTENT_TYPE", None)
+ if ct is not None:
+ environ["CONTENT_TYPE"] = ct
+ cl = environ.pop("HTTP_CONTENT_LENGTH", None)
+ if cl is not None:
+ environ["CONTENT_LENGTH"] = cl
+
+ def decode_chunked(self):
+ """Decode the 'chunked' transfer coding."""
+ self.rfile = SizeCheckWrapper(self._rfile, self.max_request_body_size)
+ cl = 0
+ data = StringIO.StringIO()
+ while True:
+ line = self.rfile.readline().strip().split(";", 1)
+ try:
+ chunk_size = line.pop(0)
+ chunk_size = int(chunk_size, 16)
+ except ValueError:
+ self.simple_response("400 Bad Request",
+ "Bad chunked transfer size: " + repr(chunk_size))
+ return
+ if chunk_size <= 0:
+ break
+## if line: chunk_extension = line[0]
+ cl += chunk_size
+ data.write(self.rfile.read(chunk_size))
+ crlf = self.rfile.read(2)
+ if crlf != CRLF:
+ self.simple_response("400 Bad Request",
+ "Bad chunked transfer coding (expected '\\r\\n', "
+ "got " + repr(crlf) + ")")
+ return
+
+ # Grab any trailer headers
+ self.read_headers()
+
+ data.seek(0)
+ self.rfile = data
+ self.environ["CONTENT_LENGTH"] = str(cl) or ""
+ return True
+
+ def respond(self):
+ """Call the appropriate WSGI app and write its iterable output."""
+ if self.chunked_read:
+ # If chunked, Content-Length will be 0.
+ try:
+ if not self.decode_chunked():
+ self.close_connection = True
+ return
+ except MaxSizeExceeded:
+ self.simple_response("413 Request Entity Too Large")
+ return
+ else:
+ cl = int(self.environ.get("CONTENT_LENGTH", 0))
+ if self.max_request_body_size and self.max_request_body_size < cl:
+ if not self.sent_headers:
+ self.simple_response("413 Request Entity Too Large")
+ return
+ self.rfile = KnownLengthRFile(self._rfile, cl)
+
+ self.environ["wsgi.input"] = self.rfile
+ self._respond()
+
+ def _respond(self):
+ env = self.get_version_specific_environ()
+ #for k, v in sorted(env.items()):
+ # print(k, '=', v)
+ response = self.wsgi_app(env, self.start_response)
+ try:
+ for chunk in response:
+ # "The start_response callable must not actually transmit
+ # the response headers. Instead, it must store them for the
+ # server or gateway to transmit only after the first
+ # iteration of the application return value that yields
+ # a NON-EMPTY string, or upon the application's first
+ # invocation of the write() callable." (PEP 333)
+ if chunk:
+ if isinstance(chunk, unicode):
+ chunk = chunk.encode('ISO-8859-1')
+ self.write(chunk)
+ finally:
+ if hasattr(response, "close"):
+ response.close()
+
+ if (self.ready and not self.sent_headers):
+ self.sent_headers = True
+ self.send_headers()
+ if self.chunked_write:
+ self.wfile.sendall("0\r\n\r\n")
+
+ def get_version_specific_environ(self):
+ """Return a new environ dict targeting the given wsgi.version"""
+ # Note that our internal environ type has keys decoded with ISO-8859-1
+ # but byte string values.
+ if self.environ["wsgi.version"] == (1, 0):
+ # Encode all keys.
+ env10 = {}
+ for k, v in self.environ.items():
+ if isinstance(k, unicode):
+ k = k.encode('ISO-8859-1')
+ env10[k] = v
+ return env10
+
+ env11 = self.environ.copy()
+
+ # Request-URI
+ env11.setdefault('wsgi.url_encoding', 'utf-8')
+ try:
+ for key in ["PATH_INFO", "SCRIPT_NAME", "QUERY_STRING"]:
+ env11[key] = self.environ[key].decode(env11['wsgi.url_encoding'])
+ except UnicodeDecodeError:
+ # Fall back to latin 1 so apps can transcode if needed.
+ env11['wsgi.url_encoding'] = 'ISO-8859-1'
+ for key in ["PATH_INFO", "SCRIPT_NAME", "QUERY_STRING"]:
+ env11[key] = self.environ[key].decode(env11['wsgi.url_encoding'])
+
+ for k, v in sorted(env11.items()):
+ if isinstance(v, str) and k not in (
+ 'REQUEST_URI', 'PATH_INFO', 'SCRIPT_NAME', 'QUERY_STRING',
+ 'wsgi.input'):
+ env11[k] = v.decode('ISO-8859-1')
+
+ return env11
+
+ def simple_response(self, status, msg=""):
+ """Write a simple response back to the client."""
+ status = str(status)
+ buf = [self.environ['ACTUAL_SERVER_PROTOCOL'] + " " +
+ status + CRLF,
+ "Content-Length: %s\r\n" % len(msg),
+ "Content-Type: text/plain\r\n"]
+
+ if status[:3] == "413" and self.response_protocol == 'HTTP/1.1':
+ # Request Entity Too Large
+ self.close_connection = True
+ buf.append("Connection: close\r\n")
+
+ buf.append(CRLF)
+ if msg:
+ if isinstance(msg, unicode):
+ msg = msg.encode("ISO-8859-1")
+ buf.append(msg)
+
+ try:
+ self.wfile.sendall("".join(buf))
+ except socket.error, x:
+ if x.args[0] not in socket_errors_to_ignore:
+ raise
+
+ def start_response(self, status, headers, exc_info = None):
+ """WSGI callable to begin the HTTP response."""
+ # "The application may call start_response more than once,
+ # if and only if the exc_info argument is provided."
+ if self.started_response and not exc_info:
+ raise AssertionError("WSGI start_response called a second "
+ "time with no exc_info.")
+
+ # "if exc_info is provided, and the HTTP headers have already been
+ # sent, start_response must raise an error, and should raise the
+ # exc_info tuple."
+ if self.sent_headers:
+ try:
+ raise exc_info[0], exc_info[1], exc_info[2]
+ finally:
+ exc_info = None
+
+ self.started_response = True
+ self.status = status
+ self.outheaders.extend(headers)
+ return self.write
+
+ def write(self, chunk):
+ """WSGI callable to write unbuffered data to the client.
+
+ This method is also used internally by start_response (to write
+ data from the iterable returned by the WSGI application).
+ """
+ if not self.started_response:
+ raise AssertionError("WSGI write called before start_response.")
+
+ if not self.sent_headers:
+ self.sent_headers = True
+ self.send_headers()
+
+ if self.chunked_write and chunk:
+ buf = [hex(len(chunk))[2:], CRLF, chunk, CRLF]
+ self.wfile.sendall("".join(buf))
+ else:
+ self.wfile.sendall(chunk)
+
+ def send_headers(self):
+ """Assert, process, and send the HTTP response message-headers."""
+ hkeys = [key.lower() for key, value in self.outheaders]
+ status = int(self.status[:3])
+
+ if status == 413:
+ # Request Entity Too Large. Close conn to avoid garbage.
+ self.close_connection = True
+ elif "content-length" not in hkeys:
+ # "All 1xx (informational), 204 (no content),
+ # and 304 (not modified) responses MUST NOT
+ # include a message-body." So no point chunking.
+ if status < 200 or status in (204, 205, 304):
+ pass
+ else:
+ if (self.response_protocol == 'HTTP/1.1'
+ and self.environ["REQUEST_METHOD"] != 'HEAD'):
+ # Use the chunked transfer-coding
+ self.chunked_write = True
+ self.outheaders.append(("Transfer-Encoding", "chunked"))
+ else:
+ # Closing the conn is the only way to determine len.
+ self.close_connection = True
+
+ if "connection" not in hkeys:
+ if self.response_protocol == 'HTTP/1.1':
+ # Both server and client are HTTP/1.1 or better
+ if self.close_connection:
+ self.outheaders.append(("Connection", "close"))
+ else:
+ # Server and/or client are HTTP/1.0
+ if not self.close_connection:
+ self.outheaders.append(("Connection", "Keep-Alive"))
+
+ if (not self.close_connection) and (not self.chunked_read):
+ # Read any remaining request body data on the socket.
+ # "If an origin server receives a request that does not include an
+ # Expect request-header field with the "100-continue" expectation,
+ # the request includes a request body, and the server responds
+ # with a final status code before reading the entire request body
+ # from the transport connection, then the server SHOULD NOT close
+ # the transport connection until it has read the entire request,
+ # or until the client closes the connection. Otherwise, the client
+ # might not reliably receive the response message. However, this
+ # requirement is not be construed as preventing a server from
+ # defending itself against denial-of-service attacks, or from
+ # badly broken client implementations."
+ remaining = getattr(self.rfile, 'remaining', 0)
+ if remaining > 0:
+ self.rfile.read(remaining)
+
+ if "date" not in hkeys:
+ self.outheaders.append(("Date", rfc822.formatdate()))
+
+ if "server" not in hkeys:
+ self.outheaders.append(("Server", self.environ['SERVER_SOFTWARE']))
+
+ buf = [self.environ['ACTUAL_SERVER_PROTOCOL'] +
+ " " + self.status + CRLF]
+ try:
+ for k, v in self.outheaders:
+ buf.append(k + ": " + v + "\r\n")
+ except TypeError:
+ if not isinstance(k, str):
+ raise TypeError("WSGI response header key %r is not a byte string." % k)
+ if not isinstance(v, str):
+ raise TypeError("WSGI response header value %r is not a byte string." % v)
+ else:
+ raise
+ buf.append(CRLF)
+ self.wfile.sendall("".join(buf))
+
+
+class NoSSLError(Exception):
+ """Exception raised when a client speaks HTTP to an HTTPS socket."""
+ pass
+
+
+class FatalSSLAlert(Exception):
+ """Exception raised when the SSL implementation signals a fatal alert."""
+ pass
+
+
+if not _fileobject_uses_str_type:
+ class CP_fileobject(socket._fileobject):
+ """Faux file object attached to a socket object."""
+
+ def sendall(self, data):
+ """Sendall for non-blocking sockets."""
+ while data:
+ try:
+ bytes_sent = self.send(data)
+ data = data[bytes_sent:]
+ except socket.error, e:
+ if e.args[0] not in socket_errors_nonblocking:
+ raise
+
+ def send(self, data):
+ return self._sock.send(data)
+
+ def flush(self):
+ if self._wbuf:
+ buffer = "".join(self._wbuf)
+ self._wbuf = []
+ self.sendall(buffer)
+
+ def recv(self, size):
+ while True:
+ try:
+ return self._sock.recv(size)
+ except socket.error, e:
+ if (e.args[0] not in socket_errors_nonblocking
+ and e.args[0] not in socket_error_eintr):
+ raise
+
+ def read(self, size=-1):
+ # Use max, disallow tiny reads in a loop as they are very inefficient.
+ # We never leave read() with any leftover data from a new recv() call
+ # in our internal buffer.
+ rbufsize = max(self._rbufsize, self.default_bufsize)
+ # Our use of StringIO rather than lists of string objects returned by
+ # recv() minimizes memory usage and fragmentation that occurs when
+ # rbufsize is large compared to the typical return value of recv().
+ buf = self._rbuf
+ buf.seek(0, 2) # seek end
+ if size < 0:
+ # Read until EOF
+ self._rbuf = StringIO.StringIO() # reset _rbuf. we consume it via buf.
+ while True:
+ data = self.recv(rbufsize)
+ if not data:
+ break
+ buf.write(data)
+ return buf.getvalue()
+ else:
+ # Read until size bytes or EOF seen, whichever comes first
+ buf_len = buf.tell()
+ if buf_len >= size:
+ # Already have size bytes in our buffer? Extract and return.
+ buf.seek(0)
+ rv = buf.read(size)
+ self._rbuf = StringIO.StringIO()
+ self._rbuf.write(buf.read())
+ return rv
+
+ self._rbuf = StringIO.StringIO() # reset _rbuf. we consume it via buf.
+ while True:
+ left = size - buf_len
+ # recv() will malloc the amount of memory given as its
+ # parameter even though it often returns much less data
+ # than that. The returned data string is short lived
+ # as we copy it into a StringIO and free it. This avoids
+ # fragmentation issues on many platforms.
+ data = self.recv(left)
+ if not data:
+ break
+ n = len(data)
+ if n == size and not buf_len:
+ # Shortcut. Avoid buffer data copies when:
+ # - We have no data in our buffer.
+ # AND
+ # - Our call to recv returned exactly the
+ # number of bytes we were asked to read.
+ return data
+ if n == left:
+ buf.write(data)
+ del data # explicit free
+ break
+ assert n <= left, "recv(%d) returned %d bytes" % (left, n)
+ buf.write(data)
+ buf_len += n
+ del data # explicit free
+ #assert buf_len == buf.tell()
+ return buf.getvalue()
+
+ def readline(self, size=-1):
+ buf = self._rbuf
+ buf.seek(0, 2) # seek end
+ if buf.tell() > 0:
+ # check if we already have it in our buffer
+ buf.seek(0)
+ bline = buf.readline(size)
+ if bline.endswith('\n') or len(bline) == size:
+ self._rbuf = StringIO.StringIO()
+ self._rbuf.write(buf.read())
+ return bline
+ del bline
+ if size < 0:
+ # Read until \n or EOF, whichever comes first
+ if self._rbufsize <= 1:
+ # Speed up unbuffered case
+ buf.seek(0)
+ buffers = [buf.read()]
+ self._rbuf = StringIO.StringIO() # reset _rbuf. we consume it via buf.
+ data = None
+ recv = self.recv
+ while data != "\n":
+ data = recv(1)
+ if not data:
+ break
+ buffers.append(data)
+ return "".join(buffers)
+
+ buf.seek(0, 2) # seek end
+ self._rbuf = StringIO.StringIO() # reset _rbuf. we consume it via buf.
+ while True:
+ data = self.recv(self._rbufsize)
+ if not data:
+ break
+ nl = data.find('\n')
+ if nl >= 0:
+ nl += 1
+ buf.write(data[:nl])
+ self._rbuf.write(data[nl:])
+ del data
+ break
+ buf.write(data)
+ return buf.getvalue()
+ else:
+ # Read until size bytes or \n or EOF seen, whichever comes first
+ buf.seek(0, 2) # seek end
+ buf_len = buf.tell()
+ if buf_len >= size:
+ buf.seek(0)
+ rv = buf.read(size)
+ self._rbuf = StringIO.StringIO()
+ self._rbuf.write(buf.read())
+ return rv
+ self._rbuf = StringIO.StringIO() # reset _rbuf. we consume it via buf.
+ while True:
+ data = self.recv(self._rbufsize)
+ if not data:
+ break
+ left = size - buf_len
+ # did we just receive a newline?
+ nl = data.find('\n', 0, left)
+ if nl >= 0:
+ nl += 1
+ # save the excess data to _rbuf
+ self._rbuf.write(data[nl:])
+ if buf_len:
+ buf.write(data[:nl])
+ break
+ else:
+ # Shortcut. Avoid data copy through buf when returning
+ # a substring of our first recv().
+ return data[:nl]
+ n = len(data)
+ if n == size and not buf_len:
+ # Shortcut. Avoid data copy through buf when
+ # returning exactly all of our first recv().
+ return data
+ if n >= left:
+ buf.write(data[:left])
+ self._rbuf.write(data[left:])
+ break
+ buf.write(data)
+ buf_len += n
+ #assert buf_len == buf.tell()
+ return buf.getvalue()
+
+else:
+ class CP_fileobject(socket._fileobject):
+ """Faux file object attached to a socket object."""
+
+ def sendall(self, data):
+ """Sendall for non-blocking sockets."""
+ while data:
+ try:
+ bytes_sent = self.send(data)
+ data = data[bytes_sent:]
+ except socket.error, e:
+ if e.args[0] not in socket_errors_nonblocking:
+ raise
+
+ def send(self, data):
+ return self._sock.send(data)
+
+ def flush(self):
+ if self._wbuf:
+ buffer = "".join(self._wbuf)
+ self._wbuf = []
+ self.sendall(buffer)
+
+ def recv(self, size):
+ while True:
+ try:
+ return self._sock.recv(size)
+ except socket.error, e:
+ if (e.args[0] not in socket_errors_nonblocking
+ and e.args[0] not in socket_error_eintr):
+ raise
+
+ def read(self, size=-1):
+ if size < 0:
+ # Read until EOF
+ buffers = [self._rbuf]
+ self._rbuf = ""
+ if self._rbufsize <= 1:
+ recv_size = self.default_bufsize
+ else:
+ recv_size = self._rbufsize
+
+ while True:
+ data = self.recv(recv_size)
+ if not data:
+ break
+ buffers.append(data)
+ return "".join(buffers)
+ else:
+ # Read until size bytes or EOF seen, whichever comes first
+ data = self._rbuf
+ buf_len = len(data)
+ if buf_len >= size:
+ self._rbuf = data[size:]
+ return data[:size]
+ buffers = []
+ if data:
+ buffers.append(data)
+ self._rbuf = ""
+ while True:
+ left = size - buf_len
+ recv_size = max(self._rbufsize, left)
+ data = self.recv(recv_size)
+ if not data:
+ break
+ buffers.append(data)
+ n = len(data)
+ if n >= left:
+ self._rbuf = data[left:]
+ buffers[-1] = data[:left]
+ break
+ buf_len += n
+ return "".join(buffers)
+
+ def readline(self, size=-1):
+ data = self._rbuf
+ if size < 0:
+ # Read until \n or EOF, whichever comes first
+ if self._rbufsize <= 1:
+ # Speed up unbuffered case
+ assert data == ""
+ buffers = []
+ while data != "\n":
+ data = self.recv(1)
+ if not data:
+ break
+ buffers.append(data)
+ return "".join(buffers)
+ nl = data.find('\n')
+ if nl >= 0:
+ nl += 1
+ self._rbuf = data[nl:]
+ return data[:nl]
+ buffers = []
+ if data:
+ buffers.append(data)
+ self._rbuf = ""
+ while True:
+ data = self.recv(self._rbufsize)
+ if not data:
+ break
+ buffers.append(data)
+ nl = data.find('\n')
+ if nl >= 0:
+ nl += 1
+ self._rbuf = data[nl:]
+ buffers[-1] = data[:nl]
+ break
+ return "".join(buffers)
+ else:
+ # Read until size bytes or \n or EOF seen, whichever comes first
+ nl = data.find('\n', 0, size)
+ if nl >= 0:
+ nl += 1
+ self._rbuf = data[nl:]
+ return data[:nl]
+ buf_len = len(data)
+ if buf_len >= size:
+ self._rbuf = data[size:]
+ return data[:size]
+ buffers = []
+ if data:
+ buffers.append(data)
+ self._rbuf = ""
+ while True:
+ data = self.recv(self._rbufsize)
+ if not data:
+ break
+ buffers.append(data)
+ left = size - buf_len
+ nl = data.find('\n', 0, left)
+ if nl >= 0:
+ nl += 1
+ self._rbuf = data[nl:]
+ buffers[-1] = data[:nl]
+ break
+ n = len(data)
+ if n >= left:
+ self._rbuf = data[left:]
+ buffers[-1] = data[:left]
+ break
+ buf_len += n
+ return "".join(buffers)
+
+
+class HTTPConnection(object):
+ """An HTTP connection (active socket).
+
+ socket: the raw socket object (usually TCP) for this connection.
+ wsgi_app: the WSGI application for this server/connection.
+ environ: a WSGI environ template. This will be copied for each request.
+
+ rfile: a fileobject for reading from the socket.
+ send: a function for writing (+ flush) to the socket.
+ """
+
+ rbufsize = -1
+ RequestHandlerClass = HTTPRequest
+ environ = {"wsgi.url_scheme": "http",
+ "wsgi.multithread": True,
+ "wsgi.multiprocess": False,
+ "wsgi.run_once": False,
+ "wsgi.errors": sys.stderr,
+ }
+
+ def __init__(self, sock, wsgi_app, environ, makefile=CP_fileobject):
+ self.socket = sock
+ self.wsgi_app = wsgi_app
+
+ # Copy the class environ into self.
+ self.environ = self.environ.copy()
+ self.environ.update(environ)
+
+ self.rfile = makefile(sock, "rb", self.rbufsize)
+ self.wfile = makefile(sock, "wb", -1)
+
+ def communicate(self):
+ """Read each request and respond appropriately."""
+ request_seen = False
+ try:
+ while True:
+ # (re)set req to None so that if something goes wrong in
+ # the RequestHandlerClass constructor, the error doesn't
+ # get written to the previous request.
+ req = None
+ req = self.RequestHandlerClass(
+ self.rfile, self.wfile, self.environ, self.wsgi_app)
+
+ # This order of operations should guarantee correct pipelining.
+ req.parse_request()
+ if not req.ready:
+ # Something went wrong in the parsing (and the server has
+ # probably already made a simple_response). Return and
+ # let the conn close.
+ return
+
+ request_seen = True
+ req.respond()
+ if req.close_connection:
+ return
+ except socket.error, e:
+ errnum = e.args[0]
+ if errnum == 'timed out':
+ # Don't error if we're between requests; only error
+ # if 1) no request has been started at all, or 2) we're
+ # in the middle of a request.
+ # See http://www.cherrypy.org/ticket/853
+ if (not request_seen) or (req and req.started_request):
+ # Don't bother writing the 408 if the response
+ # has already started being written.
+ if req and not req.sent_headers:
+ try:
+ req.simple_response("408 Request Timeout")
+ except FatalSSLAlert:
+ # Close the connection.
+ return
+ elif errnum not in socket_errors_to_ignore:
+ if req and not req.sent_headers:
+ try:
+ req.simple_response("500 Internal Server Error",
+ format_exc())
+ except FatalSSLAlert:
+ # Close the connection.
+ return
+ return
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except FatalSSLAlert:
+ # Close the connection.
+ return
+ except NoSSLError:
+ if req and not req.sent_headers:
+ # Unwrap our wfile
+ req.wfile = CP_fileobject(self.socket._sock, "wb", -1)
+ req.simple_response("400 Bad Request",
+ "The client sent a plain HTTP request, but "
+ "this server only speaks HTTPS on this port.")
+ self.linger = True
+ except Exception:
+ if req and not req.sent_headers:
+ try:
+ req.simple_response("500 Internal Server Error", format_exc())
+ except FatalSSLAlert:
+ # Close the connection.
+ return
+
+ linger = False
+
+ def close(self):
+ """Close the socket underlying this connection."""
+ self.rfile.close()
+
+ if not self.linger:
+ # Python's socket module does NOT call close on the kernel socket
+ # when you call socket.close(). We do so manually here because we
+ # want this server to send a FIN TCP segment immediately. Note this
+ # must be called *before* calling socket.close(), because the latter
+ # drops its reference to the kernel socket.
+ if hasattr(self.socket, '_sock'):
+ self.socket._sock.close()
+ self.socket.close()
+ else:
+ # On the other hand, sometimes we want to hang around for a bit
+ # to make sure the client has a chance to read our entire
+ # response. Skipping the close() calls here delays the FIN
+ # packet until the socket object is garbage-collected later.
+ # Someday, perhaps, we'll do the full lingering_close that
+ # Apache does, but not today.
+ pass
+
+
+def format_exc(limit=None):
+ """Like print_exc() but return a string. Backport for Python 2.3."""
+ try:
+ etype, value, tb = sys.exc_info()
+ return ''.join(traceback.format_exception(etype, value, tb, limit))
+ finally:
+ etype = value = tb = None
+
+
+_SHUTDOWNREQUEST = None
+
+class WorkerThread(threading.Thread):
+ """Thread which continuously polls a Queue for Connection objects.
+
+ server: the HTTP Server which spawned this thread, and which owns the
+ Queue and is placing active connections into it.
+ ready: a simple flag for the calling server to know when this thread
+ has begun polling the Queue.
+
+ Due to the timing issues of polling a Queue, a WorkerThread does not
+ check its own 'ready' flag after it has started. To stop the thread,
+ it is necessary to stick a _SHUTDOWNREQUEST object onto the Queue
+ (one for each running WorkerThread).
+ """
+
+ conn = None
+
+ def __init__(self, server):
+ self.ready = False
+ self.server = server
+ threading.Thread.__init__(self)
+
+ def run(self):
+ try:
+ self.ready = True
+ while True:
+ conn = self.server.requests.get()
+ if conn is _SHUTDOWNREQUEST:
+ return
+
+ self.conn = conn
+ try:
+ conn.communicate()
+ finally:
+ conn.close()
+ self.conn = None
+ except (KeyboardInterrupt, SystemExit), exc:
+ self.server.interrupt = exc
+
+
+class ThreadPool(object):
+ """A Request Queue for the CherryPyWSGIServer which pools threads.
+
+ ThreadPool objects must provide min, get(), put(obj), start()
+ and stop(timeout) attributes.
+ """
+
+ def __init__(self, server, min=10, max=-1):
+ self.server = server
+ self.min = min
+ self.max = max
+ self._threads = []
+ self._queue = Queue.Queue()
+ self.get = self._queue.get
+
+ def start(self):
+ """Start the pool of threads."""
+ for i in range(self.min):
+ self._threads.append(WorkerThread(self.server))
+ for worker in self._threads:
+ worker.setName("CP WSGIServer " + worker.getName())
+ worker.start()
+ for worker in self._threads:
+ while not worker.ready:
+ time.sleep(.1)
+
+ def _get_idle(self):
+ """Number of worker threads which are idle. Read-only."""
+ return len([t for t in self._threads if t.conn is None])
+ idle = property(_get_idle, doc=_get_idle.__doc__)
+
+ def put(self, obj):
+ self._queue.put(obj)
+ if obj is _SHUTDOWNREQUEST:
+ return
+
+ def grow(self, amount):
+ """Spawn new worker threads (not above self.max)."""
+ for i in range(amount):
+ if self.max > 0 and len(self._threads) >= self.max:
+ break
+ worker = WorkerThread(self.server)
+ worker.setName("CP WSGIServer " + worker.getName())
+ self._threads.append(worker)
+ worker.start()
+
+ def shrink(self, amount):
+ """Kill off worker threads (not below self.min)."""
+ # Grow/shrink the pool if necessary.
+ # Remove any dead threads from our list
+ for t in self._threads:
+ if not t.isAlive():
+ self._threads.remove(t)
+ amount -= 1
+
+ if amount > 0:
+ for i in range(min(amount, len(self._threads) - self.min)):
+ # Put a number of shutdown requests on the queue equal
+ # to 'amount'. Once each of those is processed by a worker,
+ # that worker will terminate and be culled from our list
+ # in self.put.
+ self._queue.put(_SHUTDOWNREQUEST)
+
+ def stop(self, timeout=5):
+ # Must shut down threads here so the code that calls
+ # this method can know when all threads are stopped.
+ for worker in self._threads:
+ self._queue.put(_SHUTDOWNREQUEST)
+
+ # Don't join currentThread (when stop is called inside a request).
+ current = threading.currentThread()
+ while self._threads:
+ worker = self._threads.pop()
+ if worker is not current and worker.isAlive():
+ try:
+ if timeout is None or timeout < 0:
+ worker.join()
+ else:
+ worker.join(timeout)
+ if worker.isAlive():
+ # We exhausted the timeout.
+ # Forcibly shut down the socket.
+ c = worker.conn
+ if c and not c.rfile.closed:
+ try:
+ c.socket.shutdown(socket.SHUT_RD)
+ except TypeError:
+ # pyOpenSSL sockets don't take an arg
+ c.socket.shutdown()
+ worker.join()
+ except (AssertionError,
+ # Ignore repeated Ctrl-C.
+ # See http://www.cherrypy.org/ticket/691.
+ KeyboardInterrupt), exc1:
+ pass
+
+
+
+try:
+ import fcntl
+except ImportError:
+ try:
+ from ctypes import windll, WinError
+ except ImportError:
+ def prevent_socket_inheritance(sock):
+ """Dummy function, since neither fcntl nor ctypes are available."""
+ pass
+ else:
+ def prevent_socket_inheritance(sock):
+ """Mark the given socket fd as non-inheritable (Windows)."""
+ if not windll.kernel32.SetHandleInformation(sock.fileno(), 1, 0):
+ raise WinError()
+else:
+ def prevent_socket_inheritance(sock):
+ """Mark the given socket fd as non-inheritable (POSIX)."""
+ fd = sock.fileno()
+ old_flags = fcntl.fcntl(fd, fcntl.F_GETFD)
+ fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC)
+
+
+class SSLAdapter(object):
+
+ def __init__(self, certificate, private_key, certificate_chain=None):
+ self.certificate = certificate
+ self.private_key = private_key
+ self.certificate_chain = certificate_chain
+
+ def wrap(self, sock):
+ raise NotImplemented
+
+ def makefile(self, sock, mode='r', bufsize=-1):
+ raise NotImplemented
+
+
+class CherryPyWSGIServer(object):
+ """An HTTP server for WSGI.
+
+ bind_addr: The interface on which to listen for connections.
+ For TCP sockets, a (host, port) tuple. Host values may be any IPv4
+ or IPv6 address, or any valid hostname. The string 'localhost' is a
+ synonym for '127.0.0.1' (or '::1', if your hosts file prefers IPv6).
+ The string '0.0.0.0' is a special IPv4 entry meaning "any active
+ interface" (INADDR_ANY), and '::' is the similar IN6ADDR_ANY for
+ IPv6. The empty string or None are not allowed.
+
+ For UNIX sockets, supply the filename as a string.
+ wsgi_app: the WSGI 'application callable'; multiple WSGI applications
+ may be passed as (path_prefix, app) pairs.
+ numthreads: the number of worker threads to create (default 10).
+ server_name: the string to set for WSGI's SERVER_NAME environ entry.
+ Defaults to socket.gethostname().
+ max: the maximum number of queued requests (defaults to -1 = no limit).
+ request_queue_size: the 'backlog' argument to socket.listen();
+ specifies the maximum number of queued connections (default 5).
+ timeout: the timeout in seconds for accepted connections (default 10).
+
+ nodelay: if True (the default since 3.1), sets the TCP_NODELAY socket
+ option.
+
+ protocol: the version string to write in the Status-Line of all
+ HTTP responses. For example, "HTTP/1.1" (the default). This
+ also limits the supported features used in the response.
+
+
+ SSL/HTTPS
+ ---------
+ You must have an ssl library installed and set self.ssl_adapter to an
+ instance of SSLAdapter (or a subclass) which provides the methods:
+ wrap(sock) -> wrapped socket, ssl environ dict
+ makefile(sock, mode='r', bufsize=-1) -> socket file object
+ """
+
+ protocol = "HTTP/1.1"
+ _bind_addr = "127.0.0.1"
+ version = "CherryPy/3.2.0beta"
+ ready = False
+ _interrupt = None
+
+ nodelay = True
+
+ ConnectionClass = HTTPConnection
+ environ = {}
+
+ ssl_adapter = None
+
+ def __init__(self, bind_addr, wsgi_app, numthreads=10, server_name=None,
+ max=-1, request_queue_size=5, timeout=10, shutdown_timeout=5):
+ self.requests = ThreadPool(self, min=numthreads or 1, max=max)
+ self.environ = self.environ.copy()
+
+ self.wsgi_app = wsgi_app
+
+ self.bind_addr = bind_addr
+ if not server_name:
+ server_name = socket.gethostname()
+ self.server_name = server_name
+ self.request_queue_size = request_queue_size
+
+ self.timeout = timeout
+ self.shutdown_timeout = shutdown_timeout
+
+ def _get_numthreads(self):
+ return self.requests.min
+ def _set_numthreads(self, value):
+ self.requests.min = value
+ numthreads = property(_get_numthreads, _set_numthreads)
+
+ def __str__(self):
+ return "%s.%s(%r)" % (self.__module__, self.__class__.__name__,
+ self.bind_addr)
+
+ def _get_bind_addr(self):
+ return self._bind_addr
+ def _set_bind_addr(self, value):
+ if isinstance(value, tuple) and value[0] in ('', None):
+ # Despite the socket module docs, using '' does not
+ # allow AI_PASSIVE to work. Passing None instead
+ # returns '0.0.0.0' like we want. In other words:
+ # host AI_PASSIVE result
+ # '' Y 192.168.x.y
+ # '' N 192.168.x.y
+ # None Y 0.0.0.0
+ # None N 127.0.0.1
+ # But since you can get the same effect with an explicit
+ # '0.0.0.0', we deny both the empty string and None as values.
+ raise ValueError("Host values of '' or None are not allowed. "
+ "Use '0.0.0.0' (IPv4) or '::' (IPv6) instead "
+ "to listen on all active interfaces.")
+ self._bind_addr = value
+ bind_addr = property(_get_bind_addr, _set_bind_addr,
+ doc="""The interface on which to listen for connections.
+
+ For TCP sockets, a (host, port) tuple. Host values may be any IPv4
+ or IPv6 address, or any valid hostname. The string 'localhost' is a
+ synonym for '127.0.0.1' (or '::1', if your hosts file prefers IPv6).
+ The string '0.0.0.0' is a special IPv4 entry meaning "any active
+ interface" (INADDR_ANY), and '::' is the similar IN6ADDR_ANY for
+ IPv6. The empty string or None are not allowed.
+
+ For UNIX sockets, supply the filename as a string.""")
+
+ def start(self):
+ """Run the server forever."""
+ # We don't have to trap KeyboardInterrupt or SystemExit here,
+ # because cherrpy.server already does so, calling self.stop() for us.
+ # If you're using this server with another framework, you should
+ # trap those exceptions in whatever code block calls start().
+ self._interrupt = None
+
+ # Select the appropriate socket
+ if isinstance(self.bind_addr, basestring):
+ # AF_UNIX socket
+
+ # So we can reuse the socket...
+ try: os.unlink(self.bind_addr)
+ except: pass
+
+ # So everyone can access the socket...
+ try: os.chmod(self.bind_addr, 0777)
+ except: pass
+
+ info = [(socket.AF_UNIX, socket.SOCK_STREAM, 0, "", self.bind_addr)]
+ else:
+ # AF_INET or AF_INET6 socket
+ # Get the correct address family for our host (allows IPv6 addresses)
+ host, port = self.bind_addr
+ try:
+ info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
+ socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
+ except socket.gaierror:
+ # Probably a DNS issue. Assume IPv4.
+ info = [(socket.AF_INET, socket.SOCK_STREAM, 0, "", self.bind_addr)]
+
+ self.socket = None
+ msg = "No socket could be created"
+ for res in info:
+ af, socktype, proto, canonname, sa = res
+ try:
+ self.bind(af, socktype, proto)
+ except socket.error, msg:
+ if self.socket:
+ self.socket.close()
+ self.socket = None
+ continue
+ break
+ if not self.socket:
+ raise socket.error(msg)
+
+ # Timeout so KeyboardInterrupt can be caught on Win32
+ self.socket.settimeout(1)
+ self.socket.listen(self.request_queue_size)
+
+ # Create worker threads
+ self.requests.start()
+
+ self.ready = True
+ while self.ready:
+ self.tick()
+ if self.interrupt:
+ while self.interrupt is True:
+ # Wait for self.stop() to complete. See _set_interrupt.
+ time.sleep(0.1)
+ if self.interrupt:
+ raise self.interrupt
+
+ def bind(self, family, type, proto=0):
+ """Create (or recreate) the actual socket object."""
+ self.socket = socket.socket(family, type, proto)
+ prevent_socket_inheritance(self.socket)
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ if self.nodelay and not isinstance(self.bind_addr, str):
+ self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+
+ if self.ssl_adapter is not None:
+ self.socket = self.ssl_adapter.bind(self.socket)
+
+ # If listening on the IPV6 any address ('::' = IN6ADDR_ANY),
+ # activate dual-stack. See http://www.cherrypy.org/ticket/871.
+ if (not isinstance(self.bind_addr, basestring)
+ and self.bind_addr[0] == '::' and family == socket.AF_INET6):
+ try:
+ self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
+ except (AttributeError, socket.error):
+ # Apparently, the socket option is not available in
+ # this machine's TCP stack
+ pass
+
+ self.socket.bind(self.bind_addr)
+
+ def tick(self):
+ """Accept a new connection and put it on the Queue."""
+ try:
+ s, addr = self.socket.accept()
+ if not self.ready:
+ return
+
+ prevent_socket_inheritance(s)
+ if hasattr(s, 'settimeout'):
+ s.settimeout(self.timeout)
+
+ environ = self.environ.copy()
+ # SERVER_SOFTWARE is common for IIS. It's also helpful for
+ # us to pass a default value for the "Server" response header.
+ if environ.get("SERVER_SOFTWARE") is None:
+ environ["SERVER_SOFTWARE"] = "%s WSGI Server" % self.version
+ # set a non-standard environ entry so the WSGI app can know what
+ # the *real* server protocol is (and what features to support).
+ # See http://www.faqs.org/rfcs/rfc2145.html.
+ environ["ACTUAL_SERVER_PROTOCOL"] = self.protocol
+ environ["SERVER_NAME"] = self.server_name
+
+ if isinstance(self.bind_addr, basestring):
+ # AF_UNIX. This isn't really allowed by WSGI, which doesn't
+ # address unix domain sockets. But it's better than nothing.
+ environ["SERVER_PORT"] = ""
+ else:
+ environ["SERVER_PORT"] = str(self.bind_addr[1])
+ # optional values
+ # Until we do DNS lookups, omit REMOTE_HOST
+ if addr is None: # sometimes this can happen
+ # figure out if AF_INET or AF_INET6.
+ if len(s.getsockname()) == 2:
+ # AF_INET
+ addr = ('0.0.0.0', 0)
+ else:
+ # AF_INET6
+ addr = ('::', 0)
+ environ["REMOTE_ADDR"] = addr[0]
+ environ["REMOTE_PORT"] = str(addr[1])
+
+ makefile = CP_fileobject
+ # if ssl cert and key are set, we try to be a secure HTTP server
+ if self.ssl_adapter is not None:
+ try:
+ s, ssl_env = self.ssl_adapter.wrap(s)
+ except NoSSLError:
+ msg = ("The client sent a plain HTTP request, but "
+ "this server only speaks HTTPS on this port.")
+ buf = ["%s 400 Bad Request\r\n" % self.protocol,
+ "Content-Length: %s\r\n" % len(msg),
+ "Content-Type: text/plain\r\n\r\n",
+ msg]
+
+ wfile = CP_fileobject(s, "wb", -1)
+ try:
+ wfile.sendall("".join(buf))
+ except socket.error, x:
+ if x.args[0] not in socket_errors_to_ignore:
+ raise
+ return
+ if not s:
+ return
+ environ.update(ssl_env)
+ makefile = self.ssl_adapter.makefile
+
+ conn = self.ConnectionClass(s, self.wsgi_app, environ, makefile)
+ self.requests.put(conn)
+ except socket.timeout:
+ # The only reason for the timeout in start() is so we can
+ # notice keyboard interrupts on Win32, which don't interrupt
+ # accept() by default
+ return
+ except socket.error, x:
+ if x.args[0] in socket_error_eintr:
+ # I *think* this is right. EINTR should occur when a signal
+ # is received during the accept() call; all docs say retry
+ # the call, and I *think* I'm reading it right that Python
+ # will then go ahead and poll for and handle the signal
+ # elsewhere. See http://www.cherrypy.org/ticket/707.
+ return
+ if x.args[0] in socket_errors_nonblocking:
+ # Just try again. See http://www.cherrypy.org/ticket/479.
+ return
+ if x.args[0] in socket_errors_to_ignore:
+ # Our socket was closed.
+ # See http://www.cherrypy.org/ticket/686.
+ return
+ raise
+
+ def _get_interrupt(self):
+ return self._interrupt
+ def _set_interrupt(self, interrupt):
+ self._interrupt = True
+ self.stop()
+ self._interrupt = interrupt
+ interrupt = property(_get_interrupt, _set_interrupt,
+ doc="Set this to an Exception instance to "
+ "interrupt the server.")
+
+ def stop(self):
+ """Gracefully shutdown a server that is serving forever."""
+ self.ready = False
+
+ sock = getattr(self, "socket", None)
+ if sock:
+ if not isinstance(self.bind_addr, basestring):
+ # Touch our own socket to make accept() return immediately.
+ try:
+ host, port = sock.getsockname()[:2]
+ except socket.error, x:
+ if x.args[0] not in socket_errors_to_ignore:
+ # Changed to use error code and not message
+ # See http://www.cherrypy.org/ticket/860.
+ raise
+ else:
+ # Note that we're explicitly NOT using AI_PASSIVE,
+ # here, because we want an actual IP to touch.
+ # localhost won't work if we've bound to a public IP,
+ # but it will if we bound to '0.0.0.0' (INADDR_ANY).
+ for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC,
+ socket.SOCK_STREAM):
+ af, socktype, proto, canonname, sa = res
+ s = None
+ try:
+ s = socket.socket(af, socktype, proto)
+ # See http://groups.google.com/group/cherrypy-users/
+ # browse_frm/thread/bbfe5eb39c904fe0
+ s.settimeout(1.0)
+ s.connect((host, port))
+ s.close()
+ except socket.error:
+ if s:
+ s.close()
+ if hasattr(sock, "close"):
+ sock.close()
+ self.socket = None
+
+ self.requests.stop(self.shutdown_timeout)
Added: mgmt/trunk/wooly/python/wooly/wsgiserver/ssl_builtin.py
===================================================================
--- mgmt/trunk/wooly/python/wooly/wsgiserver/ssl_builtin.py (rev 0)
+++ mgmt/trunk/wooly/python/wooly/wsgiserver/ssl_builtin.py 2009-09-08 20:31:03 UTC (rev 3618)
@@ -0,0 +1,69 @@
+"""A library for integrating pyOpenSSL with CherryPy.
+
+The ssl module must be importable for SSL functionality.
+
+To use this module, set CherryPyWSGIServer.ssl_adapter to an instance of
+BuiltinSSLAdapter.
+
+ ssl_adapter.certificate: the filename of the server SSL certificate.
+ ssl_adapter.private_key: the filename of the server's private key file.
+"""
+
+try:
+ import ssl
+except ImportError:
+ ssl = None
+
+from cherrypy import wsgiserver
+
+
+class BuiltinSSLAdapter(wsgiserver.SSLAdapter):
+ """A wrapper for integrating Python's builtin ssl module with CherryPy."""
+
+ def __init__(self, certificate, private_key, certificate_chain=None):
+ if ssl is None:
+ raise ImportError("You must install the ssl module to use HTTPS.")
+ self.certificate = certificate
+ self.private_key = private_key
+ self.certificate_chain = certificate_chain
+
+ def bind(self, sock):
+ """Wrap and return the given socket."""
+ return sock
+
+ def wrap(self, sock):
+ """Wrap and return the given socket, plus WSGI environ entries."""
+ try:
+ s = ssl.wrap_socket(sock, do_handshake_on_connect=True,
+ server_side=True, certfile=self.certificate,
+ keyfile=self.private_key, ssl_version=ssl.PROTOCOL_SSLv23)
+ except ssl.SSLError, e:
+ if e.errno == ssl.SSL_ERROR_EOF:
+ # This is almost certainly due to the cherrypy engine
+ # 'pinging' the socket to assert it's connectable;
+ # the 'ping' isn't SSL.
+ return None, {}
+ elif e.errno == ssl.SSL_ERROR_SSL:
+ if e.args[1].endswith('http request'):
+ # The client is speaking HTTP to an HTTPS server.
+ raise wsgiserver.NoSSLError
+ raise
+ return s, self.get_environ(s)
+
+ # TODO: fill this out more with mod ssl env
+ def get_environ(self, sock):
+ """Create WSGI environ entries to be merged into each request."""
+ cipher = sock.cipher()
+ ssl_environ = {
+ "wsgi.url_scheme": "https",
+ "HTTPS": "on",
+ 'SSL_PROTOCOL': cipher[1],
+ 'SSL_CIPHER': cipher[0]
+## SSL_VERSION_INTERFACE string The mod_ssl program version
+## SSL_VERSION_LIBRARY string The OpenSSL program version
+ }
+ return ssl_environ
+
+ def makefile(self, sock, mode='r', bufsize=-1):
+ return wsgiserver.CP_fileobject(sock, mode, bufsize)
+
Added: mgmt/trunk/wooly/python/wooly/wsgiserver/ssl_pyopenssl.py
===================================================================
--- mgmt/trunk/wooly/python/wooly/wsgiserver/ssl_pyopenssl.py (rev 0)
+++ mgmt/trunk/wooly/python/wooly/wsgiserver/ssl_pyopenssl.py 2009-09-08 20:31:03 UTC (rev 3618)
@@ -0,0 +1,242 @@
+"""A library for integrating pyOpenSSL with CherryPy.
+
+The OpenSSL module must be importable for SSL functionality.
+You can obtain it from http://pyopenssl.sourceforge.net/
+
+To use this module, set CherryPyWSGIServer.ssl_adapter to an instance of
+SSLAdapter. There are two ways to use SSL:
+
+Method One:
+ ssl_adapter.context: an instance of SSL.Context.
+
+ If this is not None, it is assumed to be an SSL.Context instance,
+ and will be passed to SSL.Connection on bind(). The developer is
+ responsible for forming a valid Context object. This approach is
+ to be preferred for more flexibility, e.g. if the cert and key are
+ streams instead of files, or need decryption, or SSL.SSLv3_METHOD
+ is desired instead of the default SSL.SSLv23_METHOD, etc. Consult
+ the pyOpenSSL documentation for complete options.
+
+Method Two (shortcut):
+ ssl_adapter.certificate: the filename of the server SSL certificate.
+ ssl_adapter.private_key: the filename of the server's private key file.
+
+ Both are None by default. If ssl_adapter.context is None, but .private_key
+ and .certificate are both given and valid, they will be read, and the
+ context will be automatically created from them.
+
+ ssl_adapter.certificate_chain: (optional) the filename of CA's intermediate
+ certificate bundle. This is needed for cheaper "chained root" SSL
+ certificates, and should be left as None if not required.
+"""
+
+import socket
+import threading
+import time
+
+from cherrypy import wsgiserver
+
+try:
+ from OpenSSL import SSL
+ from OpenSSL import crypto
+except ImportError:
+ SSL = None
+
+
+class SSL_fileobject(wsgiserver.CP_fileobject):
+ """SSL file object attached to a socket object."""
+
+ ssl_timeout = 3
+ ssl_retry = .01
+
+ def _safe_call(self, is_reader, call, *args, **kwargs):
+ """Wrap the given call with SSL error-trapping.
+
+ is_reader: if False EOF errors will be raised. If True, EOF errors
+ will return "" (to emulate normal sockets).
+ """
+ start = time.time()
+ while True:
+ try:
+ return call(*args, **kwargs)
+ except SSL.WantReadError:
+ # Sleep and try again. This is dangerous, because it means
+ # the rest of the stack has no way of differentiating
+ # between a "new handshake" error and "client dropped".
+ # Note this isn't an endless loop: there's a timeout below.
+ time.sleep(self.ssl_retry)
+ except SSL.WantWriteError:
+ time.sleep(self.ssl_retry)
+ except SSL.SysCallError, e:
+ if is_reader and e.args == (-1, 'Unexpected EOF'):
+ return ""
+
+ errnum = e.args[0]
+ if is_reader and errnum in wsgiserver.socket_errors_to_ignore:
+ return ""
+ raise socket.error(errnum)
+ except SSL.Error, e:
+ if is_reader and e.args == (-1, 'Unexpected EOF'):
+ return ""
+
+ thirdarg = None
+ try:
+ thirdarg = e.args[0][0][2]
+ except IndexError:
+ pass
+
+ if thirdarg == 'http request':
+ # The client is talking HTTP to an HTTPS server.
+ raise wsgiserver.NoSSLError()
+
+ raise wsgiserver.FatalSSLAlert(*e.args)
+ except:
+ raise
+
+ if time.time() - start > self.ssl_timeout:
+ raise socket.timeout("timed out")
+
+ def recv(self, *args, **kwargs):
+ buf = []
+ r = super(SSL_fileobject, self).recv
+ while True:
+ data = self._safe_call(True, r, *args, **kwargs)
+ buf.append(data)
+ p = self._sock.pending()
+ if not p:
+ return "".join(buf)
+
+ def sendall(self, *args, **kwargs):
+ return self._safe_call(False, super(SSL_fileobject, self).sendall,
+ *args, **kwargs)
+
+ def send(self, *args, **kwargs):
+ return self._safe_call(False, super(SSL_fileobject, self).send,
+ *args, **kwargs)
+
+
+class SSLConnection:
+ """A thread-safe wrapper for an SSL.Connection.
+
+ *args: the arguments to create the wrapped SSL.Connection(*args).
+ """
+
+ def __init__(self, *args):
+ self._ssl_conn = SSL.Connection(*args)
+ self._lock = threading.RLock()
+
+ for f in ('get_context', 'pending', 'send', 'write', 'recv', 'read',
+ 'renegotiate', 'bind', 'listen', 'connect', 'accept',
+ 'setblocking', 'fileno', 'close', 'get_cipher_list',
+ 'getpeername', 'getsockname', 'getsockopt', 'setsockopt',
+ 'makefile', 'get_app_data', 'set_app_data', 'state_string',
+ 'sock_shutdown', 'get_peer_certificate', 'want_read',
+ 'want_write', 'set_connect_state', 'set_accept_state',
+ 'connect_ex', 'sendall', 'settimeout', 'gettimeout'):
+ exec("""def %s(self, *args):
+ self._lock.acquire()
+ try:
+ return self._ssl_conn.%s(*args)
+ finally:
+ self._lock.release()
+""" % (f, f))
+
+ def shutdown(self, *args):
+ self._lock.acquire()
+ try:
+ # pyOpenSSL.socket.shutdown takes no args
+ return self._ssl_conn.shutdown()
+ finally:
+ self._lock.release()
+
+
+class pyOpenSSLAdapter(wsgiserver.SSLAdapter):
+ """A wrapper for integrating pyOpenSSL with CherryPy."""
+
+ def __init__(self, certificate, private_key, certificate_chain=None):
+ if SSL is None:
+ raise ImportError("You must install pyOpenSSL to use HTTPS.")
+
+ self.context = None
+ self.certificate = certificate
+ self.private_key = private_key
+ self.certificate_chain = certificate_chain
+ self._environ = None
+
+ def bind(self, sock):
+ """Wrap and return the given socket."""
+ if self.context is None:
+ self.context = self.get_context()
+ conn = SSLConnection(self.context, sock)
+ self._environ = self.get_environ()
+ return conn
+
+ def wrap(self, sock):
+ """Wrap and return the given socket, plus WSGI environ entries."""
+ return sock, self._environ.copy()
+
+ def get_context(self):
+ """Return an SSL.Context from self attributes."""
+ # See http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/442473
+ c = SSL.Context(SSL.SSLv23_METHOD)
+ c.use_privatekey_file(self.private_key)
+ if self.certificate_chain:
+ c.load_verify_locations(self.certificate_chain)
+ c.use_certificate_file(self.certificate)
+ return c
+
+ def get_environ(self):
+ """Return WSGI environ entries to be merged into each request."""
+ ssl_environ = {
+ "wsgi.url_scheme": "https",
+ "HTTPS": "on",
+ # pyOpenSSL doesn't provide access to any of these AFAICT
+## 'SSL_PROTOCOL': 'SSLv2',
+## SSL_CIPHER string The cipher specification name
+## SSL_VERSION_INTERFACE string The mod_ssl program version
+## SSL_VERSION_LIBRARY string The OpenSSL program version
+ }
+
+ if self.certificate:
+ # Server certificate attributes
+ cert = open(self.certificate, 'rb').read()
+ cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
+ ssl_environ.update({
+ 'SSL_SERVER_M_VERSION': cert.get_version(),
+ 'SSL_SERVER_M_SERIAL': cert.get_serial_number(),
+## 'SSL_SERVER_V_START': Validity of server's certificate (start time),
+## 'SSL_SERVER_V_END': Validity of server's certificate (end time),
+ })
+
+ for prefix, dn in [("I", cert.get_issuer()),
+ ("S", cert.get_subject())]:
+ # X509Name objects don't seem to have a way to get the
+ # complete DN string. Use str() and slice it instead,
+ # because str(dn) == "<X509Name object '/C=US/ST=...'>"
+ dnstr = str(dn)[18:-2]
+
+ wsgikey = 'SSL_SERVER_%s_DN' % prefix
+ ssl_environ[wsgikey] = dnstr
+
+ # The DN should be of the form: /k1=v1/k2=v2, but we must allow
+ # for any value to contain slashes itself (in a URL).
+ while dnstr:
+ pos = dnstr.rfind("=")
+ dnstr, value = dnstr[:pos], dnstr[pos + 1:]
+ pos = dnstr.rfind("/")
+ dnstr, key = dnstr[:pos], dnstr[pos + 1:]
+ if key and value:
+ wsgikey = 'SSL_SERVER_%s_DN_%s' % (prefix, key)
+ ssl_environ[wsgikey] = value
+
+ return ssl_environ
+
+ def makefile(self, sock, mode='r', bufsize=-1):
+ if SSL and isinstance(sock, SSL.ConnectionType):
+ timeout = sock.gettimeout()
+ f = SSL_fileobject(sock, mode, bufsize)
+ f.ssl_timeout = timeout
+ return f
+ else:
+ return wsgiserver.CP_fileobject(sock, mode, bufsize)
+
Deleted: mgmt/trunk/wooly/python/wooly/wsgiserver.py
===================================================================
--- mgmt/trunk/wooly/python/wooly/wsgiserver.py 2009-09-03 17:40:08 UTC (rev 3617)
+++ mgmt/trunk/wooly/python/wooly/wsgiserver.py 2009-09-08 20:31:03 UTC (rev 3618)
@@ -1,1057 +0,0 @@
-# Copyright (c) 2004-2007, CherryPy Team (team(a)cherrypy.org)
-# All rights reserved.
-
-# Redistribution and use in source and binary forms, with or without modification,
-# are permitted provided that the following conditions are met:
-
-# * Redistributions of source code must retain the above copyright notice,
-# this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above copyright notice,
-# this list of conditions and the following disclaimer in the documentation
-# and/or other materials provided with the distribution.
-# * Neither the name of the CherryPy Team nor the names of its contributors
-# may be used to endorse or promote products derived from this software
-# without specific prior written permission.
-
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
-# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
-# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""A high-speed, production ready, thread pooled, generic WSGI server.
-
-Simplest example on how to use this module directly
-(without using CherryPy's application machinery):
-
- from cherrypy import wsgiserver
-
- def my_crazy_app(environ, start_response):
- status = '200 OK'
- response_headers = [('Content-type','text/plain')]
- start_response(status, response_headers)
- return ['Hello world!\n']
-
- # Here we set our application to the script_name '/'
- wsgi_apps = [('/', my_crazy_app)]
-
- server = wsgiserver.CherryPyWSGIServer(('localhost', 8070), wsgi_apps,
- server_name='localhost')
-
- # Want SSL support? Just set these attributes
- # server.ssl_certificate = <filename>
- # server.ssl_private_key = <filename>
-
- if __name__ == '__main__':
- try:
- server.start()
- except KeyboardInterrupt:
- server.stop()
-
-This won't call the CherryPy engine (application side) at all, only the
-WSGI server, which is independant from the rest of CherryPy. Don't
-let the name "CherryPyWSGIServer" throw you; the name merely reflects
-its origin, not it's coupling.
-
-The CherryPy WSGI server can serve as many WSGI applications
-as you want in one instance:
-
- wsgi_apps = [('/', my_crazy_app), ('/blog', my_blog_app)]
-
-"""
-
-
-import base64
-import Queue
-import os
-import re
-quoted_slash = re.compile("(?i)%2F")
-import rfc822
-import socket
-try:
- import cStringIO as StringIO
-except ImportError:
- import StringIO
-import sys
-import threading
-import time
-import traceback
-from urllib import unquote
-from urlparse import urlparse
-
-try:
- from OpenSSL import SSL
- from OpenSSL import crypto
-except ImportError:
- SSL = None
-
-import errno
-socket_errors_to_ignore = []
-# Not all of these names will be defined for every platform.
-for _ in ("EPIPE", "ETIMEDOUT", "ECONNREFUSED", "ECONNRESET",
- "EHOSTDOWN", "EHOSTUNREACH",
- "WSAECONNABORTED", "WSAECONNREFUSED", "WSAECONNRESET",
- "WSAENETRESET", "WSAETIMEDOUT"):
- if _ in dir(errno):
- socket_errors_to_ignore.append(getattr(errno, _))
-# de-dupe the list
-socket_errors_to_ignore = dict.fromkeys(socket_errors_to_ignore).keys()
-socket_errors_to_ignore.append("timed out")
-
-comma_separated_headers = ['ACCEPT', 'ACCEPT-CHARSET', 'ACCEPT-ENCODING',
- 'ACCEPT-LANGUAGE', 'ACCEPT-RANGES', 'ALLOW', 'CACHE-CONTROL',
- 'CONNECTION', 'CONTENT-ENCODING', 'CONTENT-LANGUAGE', 'EXPECT',
- 'IF-MATCH', 'IF-NONE-MATCH', 'PRAGMA', 'PROXY-AUTHENTICATE', 'TE',
- 'TRAILER', 'TRANSFER-ENCODING', 'UPGRADE', 'VARY', 'VIA', 'WARNING',
- 'WWW-AUTHENTICATE']
-
-class HTTPRequest(object):
- """An HTTP Request (and response).
-
- A single HTTP connection may consist of multiple request/response pairs.
-
- connection: the HTTP Connection object which spawned this request.
- rfile: the 'read' fileobject from the connection's socket
- ready: when True, the request has been parsed and is ready to begin
- generating the response. When False, signals the calling Connection
- that the response should not be generated and the connection should
- close.
- close_connection: signals the calling Connection that the request
- should close. This does not imply an error! The client and/or
- server may each request that the connection be closed.
- chunked_write: if True, output will be encoded with the "chunked"
- transfer-coding. This value is set automatically inside
- send_headers.
- """
-
- def __init__(self, connection):
- self.connection = connection
- self.rfile = self.connection.rfile
- self.sendall = self.connection.sendall
- self.environ = connection.environ.copy()
-
- self.ready = False
- self.started_response = False
- self.status = ""
- self.outheaders = []
- self.sent_headers = False
- self.close_connection = False
- self.chunked_write = False
-
- def parse_request(self):
- """Parse the next HTTP request start-line and message-headers."""
- # HTTP/1.1 connections are persistent by default. If a client
- # requests a page, then idles (leaves the connection open),
- # then rfile.readline() will raise socket.error("timed out").
- # Note that it does this based on the value given to settimeout(),
- # and doesn't need the client to request or acknowledge the close
- # (although your TCP stack might suffer for it: cf Apache's history
- # with FIN_WAIT_2).
- request_line = self.rfile.readline()
- if not request_line:
- # Force self.ready = False so the connection will close.
- self.ready = False
- return
-
- if request_line == "\r\n":
- # RFC 2616 sec 4.1: "...if the server is reading the protocol
- # stream at the beginning of a message and receives a CRLF
- # first, it should ignore the CRLF."
- # But only ignore one leading line! else we enable a DoS.
- request_line = self.rfile.readline()
- if not request_line:
- self.ready = False
- return
-
- server = self.connection.server
- environ = self.environ
- environ["SERVER_SOFTWARE"] = "%s WSGI Server" % server.version
-
- method, path, req_protocol = request_line.strip().split(" ", 2)
- environ["REQUEST_METHOD"] = method
-
- # path may be an abs_path (including "http://host.domain.tld");
- scheme, location, path, params, qs, frag = urlparse(path)
-
- if frag:
- self.simple_response("400 Bad Request",
- "Illegal #fragment in Request-URI.")
- return
-
- if scheme:
- environ["wsgi.url_scheme"] = scheme
- if params:
- path = path + ";" + params
-
- # Unquote the path+params (e.g. "/this%20path" -> "this path").
- # http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5.1.2
- #
- # But note that "...a URI must be separated into its components
- # before the escaped characters within those components can be
- # safely decoded." http://www.ietf.org/rfc/rfc2396.txt, sec 2.4.2
- atoms = [unquote(x) for x in quoted_slash.split(path)]
- path = "%2F".join(atoms)
-
- if path == "*":
- # This means, of course, that the last wsgi_app (shortest path)
- # will always handle a URI of "*".
- environ["SCRIPT_NAME"] = ""
- environ["PATH_INFO"] = "*"
- self.wsgi_app = server.mount_points[-1][1]
- else:
- for mount_point, wsgi_app in server.mount_points:
- # The mount_points list should be sorted by length, descending.
- if path.startswith(mount_point + "/") or path == mount_point:
- environ["SCRIPT_NAME"] = mount_point
- environ["PATH_INFO"] = path[len(mount_point):]
- self.wsgi_app = wsgi_app
- break
- else:
- self.simple_response("404 Not Found")
- return
-
- # Note that, like wsgiref and most other WSGI servers,
- # we unquote the path but not the query string.
- environ["QUERY_STRING"] = qs
-
- # Compare request and server HTTP protocol versions, in case our
- # server does not support the requested protocol. Limit our output
- # to min(req, server). We want the following output:
- # request server actual written supported response
- # protocol protocol response protocol feature set
- # a 1.0 1.0 1.0 1.0
- # b 1.0 1.1 1.1 1.0
- # c 1.1 1.0 1.0 1.0
- # d 1.1 1.1 1.1 1.1
- # Notice that, in (b), the response will be "HTTP/1.1" even though
- # the client only understands 1.0. RFC 2616 10.5.6 says we should
- # only return 505 if the _major_ version is different.
- rp = int(req_protocol[5]), int(req_protocol[7])
- sp = int(server.protocol[5]), int(server.protocol[7])
- if sp[0] != rp[0]:
- self.simple_response("505 HTTP Version Not Supported")
- return
- # Bah. "SERVER_PROTOCOL" is actually the REQUEST protocol.
- environ["SERVER_PROTOCOL"] = req_protocol
- # set a non-standard environ entry so the WSGI app can know what
- # the *real* server protocol is (and what features to support).
- # See http://www.faqs.org/rfcs/rfc2145.html.
- environ["ACTUAL_SERVER_PROTOCOL"] = server.protocol
- self.response_protocol = "HTTP/%s.%s" % min(rp, sp)
-
- # If the Request-URI was an absoluteURI, use its location atom.
- if location:
- environ["SERVER_NAME"] = location
-
- # then all the http headers
- try:
- self.read_headers()
- except ValueError, ex:
- self.simple_response("400 Bad Request", repr(ex.args))
- return
-
- creds = environ.get("HTTP_AUTHORIZATION", "").split(" ", 1)
- environ["AUTH_TYPE"] = creds[0]
- if creds[0].lower() == 'basic':
- user, pw = base64.decodestring(creds[1]).split(":", 1)
- environ["REMOTE_USER"] = user
-
- # Persistent connection support
- if self.response_protocol == "HTTP/1.1":
- if environ.get("HTTP_CONNECTION", "") == "close":
- self.close_connection = True
- else:
- # HTTP/1.0
- if environ.get("HTTP_CONNECTION", "") != "Keep-Alive":
- self.close_connection = True
-
- # Transfer-Encoding support
- te = None
- if self.response_protocol == "HTTP/1.1":
- te = environ.get("HTTP_TRANSFER_ENCODING")
- if te:
- te = [x.strip().lower() for x in te.split(",") if x.strip()]
-
- read_chunked = False
-
- if te:
- for enc in te:
- if enc == "chunked":
- read_chunked = True
- else:
- # Note that, even if we see "chunked", we must reject
- # if there is an extension we don't recognize.
- self.simple_response("501 Unimplemented")
- self.close_connection = True
- return
-
- if read_chunked:
- if not self.decode_chunked():
- return
-
- # From PEP 333:
- # "Servers and gateways that implement HTTP 1.1 must provide
- # transparent support for HTTP 1.1's "expect/continue" mechanism.
- # This may be done in any of several ways:
- # 1. Respond to requests containing an Expect: 100-continue request
- # with an immediate "100 Continue" response, and proceed normally.
- # 2. Proceed with the request normally, but provide the application
- # with a wsgi.input stream that will send the "100 Continue"
- # response if/when the application first attempts to read from
- # the input stream. The read request must then remain blocked
- # until the client responds.
- # 3. Wait until the client decides that the server does not support
- # expect/continue, and sends the request body on its own.
- # (This is suboptimal, and is not recommended.)
- #
- # We used to do 3, but are now doing 1. Maybe we'll do 2 someday,
- # but it seems like it would be a big slowdown for such a rare case.
- if environ.get("HTTP_EXPECT", "") == "100-continue":
- self.simple_response(100)
-
- self.ready = True
-
- def read_headers(self):
- """Read header lines from the incoming stream."""
- environ = self.environ
-
- while True:
- line = self.rfile.readline()
- if not line:
- # No more data--illegal end of headers
- raise ValueError("Illegal end of headers.")
-
- if line == '\r\n':
- # Normal end of headers
- break
-
- if line[0] in ' \t':
- # It's a continuation line.
- v = line.strip()
- else:
- k, v = line.split(":", 1)
- k, v = k.strip().upper(), v.strip()
- envname = "HTTP_" + k.replace("-", "_")
-
- if k in comma_separated_headers:
- existing = environ.get(envname)
- if existing:
- v = ", ".join((existing, v))
- environ[envname] = v
-
- ct = environ.pop("HTTP_CONTENT_TYPE", None)
- if ct:
- environ["CONTENT_TYPE"] = ct
- cl = environ.pop("HTTP_CONTENT_LENGTH", None)
- if cl:
- environ["CONTENT_LENGTH"] = cl
-
- def decode_chunked(self):
- """Decode the 'chunked' transfer coding."""
- cl = 0
- data = StringIO.StringIO()
- while True:
- line = self.rfile.readline().strip().split(";", 1)
- chunk_size = int(line.pop(0), 16)
- if chunk_size <= 0:
- break
-## if line: chunk_extension = line[0]
- cl += chunk_size
- data.write(self.rfile.read(chunk_size))
- crlf = self.rfile.read(2)
- if crlf != "\r\n":
- self.simple_response("400 Bad Request",
- "Bad chunked transfer coding "
- "(expected '\\r\\n', got %r)" % crlf)
- return
-
- # Grab any trailer headers
- self.read_headers()
-
- data.seek(0)
- self.environ["wsgi.input"] = data
- self.environ["CONTENT_LENGTH"] = str(cl) or ""
- return True
-
- def respond(self):
- """Call the appropriate WSGI app and write its iterable output."""
- response = self.wsgi_app(self.environ, self.start_response)
- try:
- for chunk in response:
- # "The start_response callable must not actually transmit
- # the response headers. Instead, it must store them for the
- # server or gateway to transmit only after the first
- # iteration of the application return value that yields
- # a NON-EMPTY string, or upon the application's first
- # invocation of the write() callable." (PEP 333)
- if chunk:
- self.write(chunk)
- finally:
- if hasattr(response, "close"):
- response.close()
- if (self.ready and not self.sent_headers
- and not self.connection.server.interrupt):
- self.sent_headers = True
- self.send_headers()
- if self.chunked_write:
- self.sendall("0\r\n\r\n")
-
- def simple_response(self, status, msg=""):
- """Write a simple response back to the client."""
- status = str(status)
- buf = ["%s %s\r\n" % (self.connection.server.protocol, status),
- "Content-Length: %s\r\n" % len(msg)]
-
- if status[:3] == "413" and self.response_protocol == 'HTTP/1.1':
- # Request Entity Too Large
- self.close_connection = True
- buf.append("Connection: close\r\n")
-
- buf.append("\r\n")
- if msg:
- buf.append(msg)
- self.sendall("".join(buf))
-
- def start_response(self, status, headers, exc_info = None):
- """WSGI callable to begin the HTTP response."""
- if self.started_response:
- if not exc_info:
- raise AssertionError("WSGI start_response called a second "
- "time with no exc_info.")
- else:
- try:
- raise exc_info[0], exc_info[1], exc_info[2]
- finally:
- exc_info = None
- self.started_response = True
- self.status = status
- self.outheaders.extend(headers)
- return self.write
-
- def write(self, chunk):
- """WSGI callable to write unbuffered data to the client.
-
- This method is also used internally by start_response (to write
- data from the iterable returned by the WSGI application).
- """
- if not self.started_response:
- raise AssertionError("WSGI write called before start_response.")
-
- if not self.sent_headers:
- self.sent_headers = True
- self.send_headers()
-
- if self.chunked_write and chunk:
- buf = [hex(len(chunk))[2:], "\r\n", chunk, "\r\n"]
- self.sendall("".join(buf))
- else:
- self.sendall(chunk)
-
- def send_headers(self):
- """Assert, process, and send the HTTP response message-headers."""
- hkeys = [key.lower() for key, value in self.outheaders]
- status = int(self.status[:3])
-
- if status == 413:
- # Request Entity Too Large. Close conn to avoid garbage.
- self.close_connection = True
- elif "content-length" not in hkeys:
- # "All 1xx (informational), 204 (no content),
- # and 304 (not modified) responses MUST NOT
- # include a message-body." So no point chunking.
- if status < 200 or status in (204, 205, 304):
- pass
- else:
- if self.response_protocol == 'HTTP/1.1':
- # Use the chunked transfer-coding
- self.chunked_write = True
- self.outheaders.append(("Transfer-Encoding", "chunked"))
- else:
- # Closing the conn is the only way to determine len.
- self.close_connection = True
-
- if "connection" not in hkeys:
- if self.response_protocol == 'HTTP/1.1':
- if self.close_connection:
- self.outheaders.append(("Connection", "close"))
- else:
- if not self.close_connection:
- self.outheaders.append(("Connection", "Keep-Alive"))
-
- if "date" not in hkeys:
- self.outheaders.append(("Date", rfc822.formatdate()))
-
- server = self.connection.server
-
- if "server" not in hkeys:
- self.outheaders.append(("Server", server.version))
-
- buf = [server.protocol, " ", self.status, "\r\n"]
- try:
- buf += [k + ": " + v + "\r\n" for k, v in self.outheaders]
- except TypeError:
- if not isinstance(k, str):
- raise TypeError("WSGI response header key %r is not a string.")
- if not isinstance(v, str):
- raise TypeError("WSGI response header value %r is not a string.")
- else:
- raise
- buf.append("\r\n")
- self.sendall("".join(buf))
-
-
-class NoSSLError(Exception):
- """Exception raised when a client speaks HTTP to an HTTPS socket."""
- pass
-
-
-def _ssl_wrap_method(method, is_reader=False):
- """Wrap the given method with SSL error-trapping.
-
- is_reader: if False (the default), EOF errors will be raised.
- If True, EOF errors will return "" (to emulate normal sockets).
- """
- def ssl_method_wrapper(self, *args, **kwargs):
-## print (id(self), method, args, kwargs)
- start = time.time()
- while True:
- try:
- return method(self, *args, **kwargs)
- except (SSL.WantReadError, SSL.WantWriteError):
- # Sleep and try again. This is dangerous, because it means
- # the rest of the stack has no way of differentiating
- # between a "new handshake" error and "client dropped".
- # Note this isn't an endless loop: there's a timeout below.
- time.sleep(self.ssl_retry)
- except SSL.SysCallError, e:
- if is_reader and e.args == (-1, 'Unexpected EOF'):
- return ""
-
- errno = e.args[0]
- if is_reader and errno in socket_errors_to_ignore:
- return ""
- raise socket.error(errno)
- except SSL.Error, e:
- if is_reader and e.args == (-1, 'Unexpected EOF'):
- return ""
-
- thirdarg = None
- try:
- thirdarg = e.args[0][0][2]
- except IndexError:
- pass
-
- if is_reader and thirdarg == 'ssl handshake failure':
- return ""
- if thirdarg == 'http request':
- # The client is talking HTTP to an HTTPS server.
- raise NoSSLError()
- raise
- if time.time() - start > self.ssl_timeout:
- raise socket.timeout("timed out")
- return ssl_method_wrapper
-
-class SSL_fileobject(socket._fileobject):
- """Faux file object attached to a socket object."""
-
- ssl_timeout = 3
- ssl_retry = .01
-
- close = _ssl_wrap_method(socket._fileobject.close)
- flush = _ssl_wrap_method(socket._fileobject.flush)
- write = _ssl_wrap_method(socket._fileobject.write)
- writelines = _ssl_wrap_method(socket._fileobject.writelines)
- read = _ssl_wrap_method(socket._fileobject.read, is_reader=True)
- readline = _ssl_wrap_method(socket._fileobject.readline, is_reader=True)
- readlines = _ssl_wrap_method(socket._fileobject.readlines, is_reader=True)
-
-
-class HTTPConnection(object):
- """An HTTP connection (active socket).
-
- socket: the raw socket object (usually TCP) for this connection.
- addr: the "bind address" for the remote end of the socket.
- For IP sockets, this is a tuple of (REMOTE_ADDR, REMOTE_PORT).
- For UNIX domain sockets, this will be a string.
- server: the HTTP Server for this Connection. Usually, the server
- object possesses a passive (server) socket which spawns multiple,
- active (client) sockets, one for each connection.
-
- environ: a WSGI environ template. This will be copied for each request.
- rfile: a fileobject for reading from the socket.
- sendall: a function for writing (+ flush) to the socket.
- """
-
- rbufsize = -1
- RequestHandlerClass = HTTPRequest
- environ = {"wsgi.version": (1, 0),
- "wsgi.url_scheme": "http",
- "wsgi.multithread": True,
- "wsgi.multiprocess": False,
- "wsgi.run_once": False,
- "wsgi.errors": sys.stderr,
- }
-
- def __init__(self, sock, addr, server):
- self.socket = sock
- self.addr = addr
- self.server = server
- self.__aborted = False
-
- # Copy the class environ into self.
- self.environ = self.environ.copy()
-
- if SSL and isinstance(sock, SSL.ConnectionType):
- timeout = sock.gettimeout()
- self.rfile = SSL_fileobject(sock, "r", self.rbufsize)
- self.rfile.ssl_timeout = timeout
- self.sendall = _ssl_wrap_method(sock.sendall)
- self.environ["wsgi.url_scheme"] = "https"
- self.environ["HTTPS"] = "on"
- sslenv = getattr(server, "ssl_environ", None)
- if sslenv:
- self.environ.update(sslenv)
- else:
- self.rfile = sock.makefile("rb", self.rbufsize)
- self.sendall = sock.sendall
-
- self.environ.update({"wsgi.input": self.rfile,
- "SERVER_NAME": self.server.server_name,
- })
-
- if isinstance(self.server.bind_addr, basestring):
- # AF_UNIX. This isn't really allowed by WSGI, which doesn't
- # address unix domain sockets. But it's better than nothing.
- self.environ["SERVER_PORT"] = ""
- else:
- self.environ["SERVER_PORT"] = str(self.server.bind_addr[1])
- # optional values
- # Until we do DNS lookups, omit REMOTE_HOST
- self.environ["REMOTE_ADDR"] = self.addr[0]
- self.environ["REMOTE_PORT"] = str(self.addr[1])
-
- def abort(self):
- self.__aborted = True
-
- def communicate(self):
- """Read each request and respond appropriately."""
- try:
- while True:
- if self.__aborted:
- return
-
- # (re)set req to None so that if something goes wrong in
- # the RequestHandlerClass constructor, the error doesn't
- # get written to the previous request.
- req = None
- req = self.RequestHandlerClass(self)
- # This order of operations should guarantee correct pipelining.
- req.parse_request()
-
- if not req.ready:
- return
- req.respond()
- if req.close_connection:
- return
- except socket.error, e:
- errno = e.args[0]
- if errno not in socket_errors_to_ignore:
- if req:
- req.simple_response("500 Internal Server Error",
- format_exc())
- return
- except (KeyboardInterrupt, SystemExit):
- raise
- except NoSSLError:
- # Unwrap our sendall
- req.sendall = self.socket._sock.sendall
- req.simple_response("400 Bad Request",
- "The client sent a plain HTTP request, but "
- "this server only speaks HTTPS on this port.")
- except:
- if req:
- req.simple_response("500 Internal Server Error", format_exc())
-
- def close(self):
- """Close the socket underlying this connection."""
- self.rfile.close()
- self.socket.close()
-
-
-def format_exc(limit=None):
- """Like print_exc() but return a string. Backport for Python 2.3."""
- try:
- etype, value, tb = sys.exc_info()
- return ''.join(traceback.format_exception(etype, value, tb, limit))
- finally:
- etype = value = tb = None
-
-
-_SHUTDOWNREQUEST = None
-
-class WorkerThread(threading.Thread):
- """Thread which continuously polls a Queue for Connection objects.
-
- server: the HTTP Server which spawned this thread, and which owns the
- Queue and is placing active connections into it.
- ready: a simple flag for the calling server to know when this thread
- has begun polling the Queue.
-
- Due to the timing issues of polling a Queue, a WorkerThread does not
- check its own 'ready' flag after it has started. To stop the thread,
- it is necessary to stick a _SHUTDOWNREQUEST object onto the Queue
- (one for each running WorkerThread).
- """
-
- def __init__(self, server):
- self.ready = False
- self.server = server
- self.currentConnection = None
- threading.Thread.__init__(self)
-
- def run(self):
- try:
- self.ready = True
- while True:
- conn = self.server.requests.get()
- if conn is _SHUTDOWNREQUEST:
- return
-
- self.currentConnection = conn
-
- try:
- conn.communicate()
- finally:
- conn.close()
- except (KeyboardInterrupt, SystemExit), exc:
- self.server.interrupt = exc
-
-
-class SSLConnection:
- """A thread-safe wrapper for an SSL.Connection.
-
- *args: the arguments to create the wrapped SSL.Connection(*args).
- """
-
- def __init__(self, *args):
- self._ssl_conn = SSL.Connection(*args)
- self._lock = threading.RLock()
-
- for f in ('get_context', 'pending', 'send', 'write', 'recv', 'read',
- 'renegotiate', 'bind', 'listen', 'connect', 'accept',
- 'setblocking', 'fileno', 'shutdown', 'close', 'get_cipher_list',
- 'getpeername', 'getsockname', 'getsockopt', 'setsockopt',
- 'makefile', 'get_app_data', 'set_app_data', 'state_string',
- 'sock_shutdown', 'get_peer_certificate', 'want_read',
- 'want_write', 'set_connect_state', 'set_accept_state',
- 'connect_ex', 'sendall', 'settimeout'):
- exec """def %s(self, *args):
- self._lock.acquire()
- try:
- return self._ssl_conn.%s(*args)
- finally:
- self._lock.release()
-""" % (f, f)
-
-
-class CherryPyWSGIServer(object):
- """An HTTP server for WSGI.
-
- bind_addr: a (host, port) tuple if TCP sockets are desired;
- for UNIX sockets, supply the filename as a string.
- wsgi_app: the WSGI 'application callable'; multiple WSGI applications
- may be passed as (script_name, callable) pairs.
- numthreads: the number of worker threads to create (default 10).
- server_name: the string to set for WSGI's SERVER_NAME environ entry.
- Defaults to socket.gethostname().
- max: the maximum number of queued requests (defaults to -1 = no limit).
- request_queue_size: the 'backlog' argument to socket.listen();
- specifies the maximum number of queued connections (default 5).
- timeout: the timeout in seconds for accepted connections (default 10).
-
- protocol: the version string to write in the Status-Line of all
- HTTP responses. For example, "HTTP/1.1" (the default). This
- also limits the supported features used in the response.
-
-
- SSL/HTTPS
- ---------
- The OpenSSL module must be importable for SSL functionality.
- You can obtain it from http://pyopenssl.sourceforge.net/
-
- ssl_certificate: the filename of the server SSL certificate.
- ssl_privatekey: the filename of the server's private key file.
-
- If either of these is None (both are None by default), this server
- will not use SSL. If both are given and are valid, they will be read
- on server start and used in the SSL context for the listening socket.
- """
-
- protocol = "HTTP/1.1"
- version = "CherryPy/3.0.3"
- ready = False
- _interrupt = None
- ConnectionClass = HTTPConnection
-
- # Paths to certificate and private key files
- ssl_certificate = None
- ssl_private_key = None
-
- def __init__(self, bind_addr, wsgi_app, numthreads=10, server_name=None,
- max=-1, request_queue_size=5, timeout=10):
- self.requests = Queue.Queue(max)
-
- if callable(wsgi_app):
- # We've been handed a single wsgi_app, in CP-2.1 style.
- # Assume it's mounted at "".
- self.mount_points = [("", wsgi_app)]
- else:
- # We've been handed a list of (mount_point, wsgi_app) tuples,
- # so that the server can call different wsgi_apps, and also
- # correctly set SCRIPT_NAME.
- self.mount_points = wsgi_app
- self.mount_points.sort()
- self.mount_points.reverse()
-
- self.bind_addr = bind_addr
- self.numthreads = numthreads or 1
- if not server_name:
- server_name = socket.gethostname()
- self.server_name = server_name
- self.request_queue_size = request_queue_size
- self._workerThreads = []
-
- self.timeout = timeout
-
- def start(self):
- """Run the server forever."""
- # We don't have to trap KeyboardInterrupt or SystemExit here,
- # because cherrpy.server already does so, calling self.stop() for us.
- # If you're using this server with another framework, you should
- # trap those exceptions in whatever code block calls start().
- self._interrupt = None
-
- # Select the appropriate socket
- if isinstance(self.bind_addr, basestring):
- # AF_UNIX socket
-
- # So we can reuse the socket...
- try: os.unlink(self.bind_addr)
- except: pass
-
- # So everyone can access the socket...
- try: os.chmod(self.bind_addr, 0777)
- except: pass
-
- info = [(socket.AF_UNIX, socket.SOCK_STREAM, 0, "", self.bind_addr)]
- else:
- # AF_INET or AF_INET6 socket
- # Get the correct address family for our host (allows IPv6 addresses)
- host, port = self.bind_addr
- flags = 0
- if host == '':
- # Despite the socket module docs, using '' does not
- # allow AI_PASSIVE to work. Passing None instead
- # returns '0.0.0.0' like we want. In other words:
- # host AI_PASSIVE result
- # '' Y 192.168.x.y
- # '' N 192.168.x.y
- # None Y 0.0.0.0
- # None N 127.0.0.1
- host = None
- flags = socket.AI_PASSIVE
- try:
- info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
- socket.SOCK_STREAM, 0, flags)
- except socket.gaierror:
- # Probably a DNS issue. Assume IPv4.
- info = [(socket.AF_INET, socket.SOCK_STREAM, 0, "", self.bind_addr)]
-
- self.socket = None
- msg = "No socket could be created"
- for res in info:
- af, socktype, proto, canonname, sa = res
- try:
- self.bind(af, socktype, proto)
- except socket.error, msg:
- if self.socket:
- self.socket.close()
- self.socket = None
- continue
- break
- if not self.socket:
- raise socket.error, msg
-
- # Timeout so KeyboardInterrupt can be caught on Win32
- self.socket.settimeout(1)
- self.socket.listen(self.request_queue_size)
-
- # Create worker threads
- for i in xrange(self.numthreads):
- self._workerThreads.append(WorkerThread(self))
- for worker in self._workerThreads:
- worker.setName("CP WSGIServer " + worker.getName())
- worker.start()
- for worker in self._workerThreads:
- while not worker.ready:
- time.sleep(.1)
-
- self.ready = True
- while self.ready:
- self.tick()
- if self.interrupt:
- while self.interrupt is True:
- # Wait for self.stop() to complete. See _set_interrupt.
- time.sleep(0.1)
- raise self.interrupt
-
- def bind(self, family, type, proto=0):
- """Create (or recreate) the actual socket object."""
- self.socket = socket.socket(family, type, proto)
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-## self.socket.setsockopt(socket.SOL_SOCKET, socket.TCP_NODELAY, 1)
- if self.ssl_certificate and self.ssl_private_key:
- if SSL is None:
- raise ImportError("You must install pyOpenSSL to use HTTPS.")
-
- # See http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/442473
- ctx = SSL.Context(SSL.SSLv23_METHOD)
- ctx.use_privatekey_file(self.ssl_private_key)
- ctx.use_certificate_file(self.ssl_certificate)
- self.socket = SSLConnection(ctx, self.socket)
- self.populate_ssl_environ()
- self.socket.bind(self.bind_addr)
-
- def tick(self):
- """Accept a new connection and put it on the Queue."""
- try:
- s, addr = self.socket.accept()
- if not self.ready:
- return
- if hasattr(s, 'settimeout'):
- s.settimeout(self.timeout)
- conn = self.ConnectionClass(s, addr, self)
- self.requests.put(conn)
- except socket.timeout:
- # The only reason for the timeout in start() is so we can
- # notice keyboard interrupts on Win32, which don't interrupt
- # accept() by default
- return
- except socket.error, x:
- msg = x.args[1]
- if msg in ("Bad file descriptor", "Socket operation on non-socket"):
- # Our socket was closed.
- return
- if msg == "Resource temporarily unavailable":
- # Just try again. See http://www.cherrypy.org/ticket/479.
- return
- raise
-
- def _get_interrupt(self):
- return self._interrupt
- def _set_interrupt(self, interrupt):
- self._interrupt = True
- self.stop()
- self._interrupt = interrupt
- interrupt = property(_get_interrupt, _set_interrupt,
- doc="Set this to an Exception instance to "
- "interrupt the server.")
-
- def stop(self):
- """Gracefully shutdown a server that is serving forever."""
- self.ready = False
-
- sock = getattr(self, "socket", None)
- if sock:
- if not isinstance(self.bind_addr, basestring):
- # Touch our own socket to make accept() return immediately.
- try:
- host, port = sock.getsockname()[:2]
- except socket.error, x:
- if x.args[1] != "Bad file descriptor":
- raise
- else:
- # Note that we're explicitly NOT using AI_PASSIVE,
- # here, because we want an actual IP to touch.
- # localhost won't work if we've bound to a public IP,
- # but it would if we bound to INADDR_ANY via host = ''.
- for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC,
- socket.SOCK_STREAM):
- af, socktype, proto, canonname, sa = res
- s = None
- try:
- s = socket.socket(af, socktype, proto)
- # See http://groups.google.com/group/cherrypy-users/
- # browse_frm/thread/bbfe5eb39c904fe0
- s.settimeout(1.0)
- s.connect((host, port))
- s.close()
- except socket.error:
- if s:
- s.close()
- if hasattr(sock, "close"):
- sock.close()
- self.socket = None
-
- # Must shut down threads here so the code that calls
- # this method can know when all threads are stopped.
- for worker in self._workerThreads:
- self.requests.put(_SHUTDOWNREQUEST)
-
- # Don't join currentThread (when stop is called inside a request).
- current = threading.currentThread()
- while self._workerThreads:
- worker = self._workerThreads.pop()
- if worker is not current and worker.isAlive():
- try:
- if worker.currentConnection:
- worker.currentConnection.abort()
- worker.join()
- except AssertionError:
- pass
-
- def populate_ssl_environ(self):
- """Create WSGI environ entries to be merged into each request."""
- cert = open(self.ssl_certificate).read()
- cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
- self.ssl_environ = {
- # pyOpenSSL doesn't provide access to any of these AFAICT
-## 'SSL_PROTOCOL': 'SSLv2',
-## SSL_CIPHER string The cipher specification name
-## SSL_VERSION_INTERFACE string The mod_ssl program version
-## SSL_VERSION_LIBRARY string The OpenSSL program version
- }
-
- # Server certificate attributes
- self.ssl_environ.update({
- 'SSL_SERVER_M_VERSION': cert.get_version(),
- 'SSL_SERVER_M_SERIAL': cert.get_serial_number(),
-## 'SSL_SERVER_V_START': Validity of server's certificate (start time),
-## 'SSL_SERVER_V_END': Validity of server's certificate (end time),
- })
-
- for prefix, dn in [("I", cert.get_issuer()),
- ("S", cert.get_subject())]:
- # X509Name objects don't seem to have a way to get the
- # complete DN string. Use str() and slice it instead,
- # because str(dn) == "<X509Name object '/C=US/ST=...'>"
- dnstr = str(dn)[18:-2]
-
- wsgikey = 'SSL_SERVER_%s_DN' % prefix
- self.ssl_environ[wsgikey] = dnstr
-
- # The DN should be of the form: /k1=v1/k2=v2, but we must allow
- # for any value to contain slashes itself (in a URL).
- while dnstr:
- pos = dnstr.rfind("=")
- dnstr, value = dnstr[:pos], dnstr[pos + 1:]
- pos = dnstr.rfind("/")
- dnstr, key = dnstr[:pos], dnstr[pos + 1:]
- if key and value:
- wsgikey = 'SSL_SERVER_%s_DN_%s' % (prefix, key)
- self.ssl_environ[wsgikey] = value
15 years, 3 months
rhmessaging commits: r3617 - mgmt/trunk/cumin/python/cumin/messaging.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-09-03 13:40:08 -0400 (Thu, 03 Sep 2009)
New Revision: 3617
Modified:
mgmt/trunk/cumin/python/cumin/messaging/connection.py
Log:
Fix a crash due to sorting on the wrong column name
Modified: mgmt/trunk/cumin/python/cumin/messaging/connection.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/messaging/connection.py 2009-09-03 16:04:50 UTC (rev 3616)
+++ mgmt/trunk/cumin/python/cumin/messaging/connection.py 2009-09-03 17:40:08 UTC (rev 3617)
@@ -215,7 +215,7 @@
col = self.ExpiresColumn(app, "expires")
self.add_column(col)
- col = self.StatusColumn(app, "status")
+ col = self.StatusColumn(app, "attached")
self.add_column(col)
self.__phase = PhaseSwitch(app, "phase")
15 years, 3 months
rhmessaging commits: r3616 - in mgmt/trunk/mint: sql and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-09-03 12:04:50 -0400 (Thu, 03 Sep 2009)
New Revision: 3616
Modified:
mgmt/trunk/mint/python/mint/schema.py
mgmt/trunk/mint/python/mint/schemaparser.py
mgmt/trunk/mint/sql/schema.sql
Log:
* Remove the classInfos attr from schema objects; it's no longer used
* Make the parser blow up if an objId attr doesn't have a references
* Regenerate the schema
Modified: mgmt/trunk/mint/python/mint/schema.py
===================================================================
--- mgmt/trunk/mint/python/mint/schema.py 2009-09-03 15:45:27 UTC (rev 3615)
+++ mgmt/trunk/mint/python/mint/schema.py 2009-09-03 16:04:50 UTC (rev 3616)
@@ -26,7 +26,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('SlotStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
Pool = StringCol(length=1000, default=None)
System = StringCol(length=1000, default=None)
AccountingGroup = StringCol(length=1000, default=None)
@@ -39,7 +38,6 @@
Disk = BigIntCol(default=None)
FileSystemDomain = StringCol(length=1000, default=None)
GlobalJobId = StringCol(length=1000, default=None)
- ImageSize = BigIntCol(default=None)
IsValidCheckpointPlatform = StringCol(length=4000, default=None)
JobId = StringCol(length=1000, default=None)
JobStart = TimestampCol(default=None)
@@ -88,7 +86,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
slot = ForeignKey('Slot', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
Activity = StringCol(length=1000, default=None)
ClockDay = BigIntCol(default=None)
ClockMin = BigIntCol(default=None)
@@ -96,6 +93,7 @@
ConsoleIdle = BigIntCol(default=None)
EnteredCurrentActivity = TimestampCol(default=None)
EnteredCurrentState = TimestampCol(default=None)
+ ImageSize = BigIntCol(default=None)
KeyboardIdle = BigIntCol(default=None)
LastBenchmark = TimestampCol(default=None)
LastFetchWorkCompleted = TimestampCol(default=None)
@@ -149,7 +147,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('JobStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
scheduler = ForeignKey('Scheduler', cascade='null', default=None)
submitter = ForeignKey('Submitter', cascade='null', default=None)
AccountingGroup = StringCol(length=1000, default=None)
@@ -241,7 +238,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
job = ForeignKey('Job', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
@@ -263,7 +259,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('SchedulerStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
Pool = StringCol(length=1000, default=None)
System = StringCol(length=1000, default=None)
JobQueueBirthdate = TimestampCol(default=None)
@@ -393,7 +388,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
scheduler = ForeignKey('Scheduler', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
NumUsers = BigIntCol(default=None)
TotalHeldJobs = BigIntCol(default=None)
TotalIdleJobs = BigIntCol(default=None)
@@ -428,7 +422,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('SubmitterStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
scheduler = ForeignKey('Scheduler', cascade='null', default=None)
JobQueueBirthdate = TimestampCol(default=None)
Machine = StringCol(length=1000, default=None)
@@ -441,7 +434,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
submitter = ForeignKey('Submitter', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
HeldJobs = BigIntCol(default=None)
IdleJobs = BigIntCol(default=None)
RunningJobs = BigIntCol(default=None)
@@ -466,7 +458,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('NegotiatorStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
Pool = StringCol(length=1000, default=None)
System = StringCol(length=1000, default=None)
Name = StringCol(length=1000, default=None)
@@ -567,7 +558,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
negotiator = ForeignKey('Negotiator', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
MonitorSelfAge = BigIntCol(default=None)
MonitorSelfCPUUsage = FloatCol(default=None)
@@ -596,7 +586,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('CollectorStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
Pool = StringCol(length=1000, default=None)
System = StringCol(length=1000, default=None)
CondorPlatform = StringCol(length=1000, default=None)
@@ -610,7 +599,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
collector = ForeignKey('Collector', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
RunningJobs = BigIntCol(default=None)
IdleJobs = BigIntCol(default=None)
HostsTotal = BigIntCol(default=None)
@@ -638,7 +626,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('MasterStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
Pool = StringCol(length=1000, default=None)
System = StringCol(length=1000, default=None)
Name = StringCol(length=1000, default=None)
@@ -673,7 +660,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
master = ForeignKey('Master', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
MonitorSelfAge = BigIntCol(default=None)
MonitorSelfCPUUsage = FloatCol(default=None)
@@ -702,7 +688,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('GridStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
Pool = StringCol(length=1000, default=None)
Name = StringCol(length=1000, default=None)
ScheddName = StringCol(length=1000, default=None)
@@ -717,7 +702,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
grid = ForeignKey('Grid', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
NumJobs = BigIntCol(default=None)
SubmitsInProgress = BigIntCol(default=None)
SubmitsQueued = BigIntCol(default=None)
@@ -746,7 +730,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('SubmissionStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
scheduler = ForeignKey('Scheduler', cascade='null', default=None)
submitter = ForeignKey('Submitter', cascade='null', default=None)
Name = StringCol(length=1000, default=None)
@@ -757,7 +740,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
submission = ForeignKey('Submission', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
Idle = BigIntCol(default=None)
Running = BigIntCol(default=None)
Removed = BigIntCol(default=None)
@@ -784,7 +766,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('AclStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
broker = ForeignKey('Broker', cascade='null', default=None)
policyFile = StringCol(length=1000, default=None)
enforcingAcl = BoolCol(default=None)
@@ -804,7 +785,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
acl = ForeignKey('Acl', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
aclDenyCount = BigIntCol(default=None)
@@ -827,7 +807,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('ClusterStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
broker = ForeignKey('Broker', cascade='null', default=None)
clusterName = StringCol(length=1000, default=None)
clusterID = StringCol(length=1000, default=None)
@@ -858,7 +837,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
cluster = ForeignKey('Cluster', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
@@ -880,7 +858,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('StoreStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
broker = ForeignKey('Broker', cascade='null', default=None)
location = StringCol(length=1000, default=None)
defaultInitialFileCount = IntCol(default=None)
@@ -899,7 +876,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
store = ForeignKey('Store', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
tplTransactionDepth = BigIntCol(default=None)
tplTransactionDepthLow = BigIntCol(default=None)
tplTransactionDepthHigh = BigIntCol(default=None)
@@ -930,7 +906,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('JournalStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
queue = ForeignKey('Queue', cascade='null', default=None)
name = StringCol(length=1000, default=None)
directory = StringCol(length=1000, default=None)
@@ -960,7 +935,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
journal = ForeignKey('Journal', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
recordDepth = BigIntCol(default=None)
recordDepthLow = BigIntCol(default=None)
recordDepthHigh = BigIntCol(default=None)
@@ -1011,7 +985,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('SystemStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
systemId = BLOBCol(default=None)
osName = StringCol(length=1000, default=None)
nodeName = StringCol(length=1000, default=None)
@@ -1025,7 +998,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
system = ForeignKey('System', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
@@ -1047,7 +1019,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('BrokerStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
system = ForeignKey('System', cascade='null', default=None)
port = BigIntCol(default=None)
workerThreads = IntCol(default=None)
@@ -1109,7 +1080,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
broker = ForeignKey('Broker', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
uptime = BigIntCol(default=None)
@@ -1132,7 +1102,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('AgentStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
clientConnection = ForeignKey('ClientConnection', cascade='null', default=None)
label = StringCol(length=1000, default=None)
broker = ForeignKey('Broker', cascade='null', default=None)
@@ -1146,7 +1115,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
agent = ForeignKey('Agent', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
@@ -1168,7 +1136,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('VhostStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
broker = ForeignKey('Broker', cascade='null', default=None)
name = StringCol(length=1000, default=None)
federationTag = StringCol(length=1000, default=None)
@@ -1179,7 +1146,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
vhost = ForeignKey('Vhost', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
@@ -1201,7 +1167,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('QueueStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
vhost = ForeignKey('Vhost', cascade='null', default=None)
name = StringCol(length=1000, default=None)
durable = BoolCol(default=None)
@@ -1224,7 +1189,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
queue = ForeignKey('Queue', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
msgTotalEnqueues = BigIntCol(default=None)
msgTotalDequeues = BigIntCol(default=None)
msgTxnEnqueues = BigIntCol(default=None)
@@ -1273,11 +1237,12 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('ExchangeStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
vhost = ForeignKey('Vhost', cascade='null', default=None)
name = StringCol(length=1000, default=None)
type = StringCol(length=1000, default=None)
durable = BoolCol(default=None)
+ autoDelete = BoolCol(default=None)
+ exchange = ForeignKey('Exchange', cascade='null', default=None)
arguments = PickleCol(default=None)
@@ -1286,7 +1251,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
exchange = ForeignKey('Exchange', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
producerCount = BigIntCol(default=None)
producerCountLow = BigIntCol(default=None)
producerCountHigh = BigIntCol(default=None)
@@ -1320,7 +1284,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('BindingStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
exchange = ForeignKey('Exchange', cascade='null', default=None)
queue = ForeignKey('Queue', cascade='null', default=None)
bindingKey = StringCol(length=1000, default=None)
@@ -1333,7 +1296,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
binding = ForeignKey('Binding', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
msgMatched = BigIntCol(default=None)
@@ -1356,7 +1318,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('ClientConnectionStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
vhost = ForeignKey('Vhost', cascade='null', default=None)
address = StringCol(length=1000, default=None)
incoming = BoolCol(default=None)
@@ -1379,7 +1340,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
clientConnection = ForeignKey('ClientConnection', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
closing = BoolCol(default=None)
framesFromClient = BigIntCol(default=None)
framesToClient = BigIntCol(default=None)
@@ -1406,7 +1366,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('LinkStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
vhost = ForeignKey('Vhost', cascade='null', default=None)
host = StringCol(length=1000, default=None)
port = BigIntCol(default=None)
@@ -1452,7 +1411,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
link = ForeignKey('Link', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
state = StringCol(length=1000, default=None)
lastError = StringCol(length=1000, default=None)
@@ -1476,7 +1434,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('BridgeStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
link = ForeignKey('Link', cascade='null', default=None)
channelId = IntCol(default=None)
durable = BoolCol(default=None)
@@ -1502,7 +1459,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
bridge = ForeignKey('Bridge', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
@@ -1524,7 +1480,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('SessionStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
vhost = ForeignKey('Vhost', cascade='null', default=None)
name = StringCol(length=1000, default=None)
channelId = IntCol(default=None)
@@ -1564,7 +1519,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
session = ForeignKey('Session', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
framesOutstanding = BigIntCol(default=None)
TxnStarts = BigIntCol(default=None)
TxnCommits = BigIntCol(default=None)
@@ -1592,7 +1546,6 @@
statsCurrIndex = DatabaseIndex(statsCurr)
statsPrev = ForeignKey('SysimageStats', cascade='null', default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
- classInfos = dict() # brokerId => classInfo
uuid = BLOBCol(default=None)
osName = StringCol(length=1000, default=None)
nodeName = StringCol(length=1000, default=None)
@@ -1609,7 +1562,6 @@
lazyUpdate = True
qmfUpdateTime = TimestampCol(default=None)
sysimage = ForeignKey('Sysimage', cascade='null', default=None)
- classInfos = dict() # brokerId => classInfo
memFree = BigIntCol(default=None)
swapFree = BigIntCol(default=None)
loadAverage1Min = FloatCol(default=None)
@@ -1764,7 +1716,9 @@
Vhost.sqlmeta.addJoin(SQLMultipleJoin('Exchange', joinMethodName='exchanges'))
+Exchange.sqlmeta.addJoin(SQLMultipleJoin('Exchange', joinMethodName='exchanges'))
+
Exchange.sqlmeta.addJoin(SQLMultipleJoin('ExchangeStats', joinMethodName='stats'))
classToSchemaNameMap['Binding'] = 'Binding'
Modified: mgmt/trunk/mint/python/mint/schemaparser.py
===================================================================
--- mgmt/trunk/mint/python/mint/schemaparser.py 2009-09-03 15:45:27 UTC (rev 3615)
+++ mgmt/trunk/mint/python/mint/schemaparser.py 2009-09-03 16:04:50 UTC (rev 3616)
@@ -117,6 +117,8 @@
elemName = self.renameReservedWord(elem["@name"])
if (elem["@type"] == "objId"):
reference = elem["@references"]
+ if not reference:
+ raise Exception("Attribute of objId type is missing references value")
#XXX: TO-DO: properly handle namespaces
# handle cases where the referenced class is in a different namespace (ie, contains a "." or a ":");
# for now, discard namespace
@@ -176,8 +178,6 @@
self.generateForeignKeyAttribWithIndex("statsPrev", statsPythonName)
self.finalPythonOutput += "classToSchemaNameMap['%s'] = '%s'\n" % (pythonName, schemaName)
self.finalPythonOutput += "schemaNameToClassMap['%s'] = %s\n" % (schemaName, pythonName)
- # XXX get rid of this
- self.pythonOutput += " classInfos = dict() # brokerId => classInfo\n"
def generateMethod(self, elem):
if (elem["@desc"] != None):
Modified: mgmt/trunk/mint/sql/schema.sql
===================================================================
--- mgmt/trunk/mint/sql/schema.sql 2009-09-03 15:45:27 UTC (rev 3615)
+++ mgmt/trunk/mint/sql/schema.sql 2009-09-03 16:04:50 UTC (rev 3616)
@@ -10,20 +10,6 @@
);
CREATE UNIQUE INDEX broker_group_mapping_unique ON broker_group_mapping (broker_id, broker_group_id);
-CREATE TABLE broker_registration (
- id SERIAL PRIMARY KEY,
- name VARCHAR(1000) NOT NULL UNIQUE,
- url VARCHAR(1000),
- broker_id INT
-);
-CREATE UNIQUE INDEX broker_registration_url_unique ON broker_registration (url);
-
-CREATE TABLE collector_registration (
- id SERIAL PRIMARY KEY,
- name VARCHAR(1000),
- collector_id VARCHAR(1000)
-);
-
CREATE TABLE mint_info (
id SERIAL PRIMARY KEY,
version VARCHAR(1000) NOT NULL
@@ -130,8 +116,8 @@
origin VARCHAR(1000)
);
CREATE UNIQUE INDEX binding_qmfIdsUnique ON binding (qmf_broker_id, qmf_scope_id, qmf_object_id);
-CREATE INDEX binding_statsCurrIndex ON binding (stats_curr_id);
CREATE INDEX binding_statsPrevIndex ON binding (stats_prev_id);
+CREATE INDEX binding_statsCurrIndex ON binding (stats_curr_id);
CREATE TABLE binding_stats (
id SERIAL PRIMARY KEY,
@@ -197,8 +183,7 @@
staging_threshold BIGINT,
mgmt_pub_interval INT,
version VARCHAR(1000),
- data_dir VARCHAR(1000),
- registration_id INT
+ data_dir VARCHAR(1000)
);
CREATE UNIQUE INDEX broker_qmfIdsUnique ON broker (qmf_broker_id, qmf_scope_id, qmf_object_id);
CREATE INDEX broker_statsCurrIndex ON broker (stats_curr_id);
@@ -335,6 +320,8 @@
name VARCHAR(1000),
type VARCHAR(1000),
durable BOOL,
+ auto_delete BOOL,
+ exchange_id INT,
arguments BYTEA
);
CREATE UNIQUE INDEX exchange_qmfIdsUnique ON exchange (qmf_broker_id, qmf_scope_id, qmf_object_id);
@@ -381,8 +368,8 @@
grid_resource_unavailable_time TIMESTAMP
);
CREATE UNIQUE INDEX grid_qmfIdsUnique ON grid (qmf_broker_id, qmf_scope_id, qmf_object_id);
-CREATE INDEX grid_statsPrevIndex ON grid (stats_prev_id);
CREATE INDEX grid_statsCurrIndex ON grid (stats_curr_id);
+CREATE INDEX grid_statsPrevIndex ON grid (stats_prev_id);
CREATE TABLE grid_stats (
id SERIAL PRIMARY KEY,
@@ -796,7 +783,6 @@
disk BIGINT,
file_system_domain VARCHAR(1000),
global_job_id VARCHAR(1000),
- image_size BIGINT,
is_valid_checkpoint_platform VARCHAR(4000),
job_id VARCHAR(1000),
job_start TIMESTAMP,
@@ -853,6 +839,7 @@
console_idle BIGINT,
entered_current_activity TIMESTAMP,
entered_current_state TIMESTAMP,
+ image_size BIGINT,
keyboard_idle BIGINT,
last_benchmark TIMESTAMP,
last_fetch_work_completed TIMESTAMP,
@@ -982,8 +969,8 @@
schedd_name VARCHAR(1000)
);
CREATE UNIQUE INDEX submitter_qmfIdsUnique ON submitter (qmf_broker_id, qmf_scope_id, qmf_object_id);
-CREATE INDEX submitter_statsCurrIndex ON submitter (stats_curr_id);
CREATE INDEX submitter_statsPrevIndex ON submitter (stats_prev_id);
+CREATE INDEX submitter_statsCurrIndex ON submitter (stats_curr_id);
CREATE TABLE submitter_stats (
id SERIAL PRIMARY KEY,
@@ -1095,8 +1082,6 @@
ALTER TABLE broker_group_mapping ADD CONSTRAINT broker_group_id_exists FOREIGN KEY (broker_group_id) REFERENCES broker_group (id) ON DELETE CASCADE;
-ALTER TABLE broker_registration ADD CONSTRAINT broker_id_exists FOREIGN KEY (broker_id) REFERENCES broker (id) ON DELETE SET NULL;
-
ALTER TABLE subject_role_mapping ADD CONSTRAINT subject_id_exists FOREIGN KEY (subject_id) REFERENCES subject (id) ON DELETE CASCADE;
ALTER TABLE subject_role_mapping ADD CONSTRAINT role_id_exists FOREIGN KEY (role_id) REFERENCES role (id) ON DELETE CASCADE;
@@ -1143,8 +1128,6 @@
ALTER TABLE broker ADD CONSTRAINT system_id_exists FOREIGN KEY (system_id) REFERENCES system (id) ON DELETE SET NULL;
-ALTER TABLE broker ADD CONSTRAINT registration_id_exists FOREIGN KEY (registration_id) REFERENCES broker_registration (id) ON DELETE SET NULL;
-
ALTER TABLE broker_stats ADD CONSTRAINT broker_id_exists FOREIGN KEY (broker_id) REFERENCES broker (id) ON DELETE SET NULL;
ALTER TABLE client_connection ADD CONSTRAINT stats_curr_id_exists FOREIGN KEY (stats_curr_id) REFERENCES client_connection_stats (id) ON DELETE SET NULL;
@@ -1175,6 +1158,8 @@
ALTER TABLE exchange ADD CONSTRAINT vhost_id_exists FOREIGN KEY (vhost_id) REFERENCES vhost (id) ON DELETE SET NULL;
+ALTER TABLE exchange ADD CONSTRAINT exchange_id_exists FOREIGN KEY (exchange_id) REFERENCES exchange (id) ON DELETE SET NULL;
+
ALTER TABLE exchange_stats ADD CONSTRAINT exchange_id_exists FOREIGN KEY (exchange_id) REFERENCES exchange (id) ON DELETE SET NULL;
ALTER TABLE grid ADD CONSTRAINT stats_curr_id_exists FOREIGN KEY (stats_curr_id) REFERENCES grid_stats (id) ON DELETE SET NULL;
15 years, 3 months
rhmessaging commits: r3615 - mgmt/trunk/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-09-03 11:45:27 -0400 (Thu, 03 Sep 2009)
New Revision: 3615
Modified:
mgmt/trunk/mint/python/mint/model.py
Log:
Remove an unused mint class
Modified: mgmt/trunk/mint/python/mint/model.py
===================================================================
--- mgmt/trunk/mint/python/mint/model.py 2009-09-03 15:33:19 UTC (rev 3614)
+++ mgmt/trunk/mint/python/mint/model.py 2009-09-03 15:45:27 UTC (rev 3615)
@@ -87,13 +87,6 @@
version = StringCol(length=1000, default="0.1", notNone=True)
-class CollectorRegistration(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(length=1000, default=None)
- collectorId = StringCol(length=1000, default=None)
-
class BrokerGroup(SQLObject):
class sqlmeta:
lazyUpdate = True
15 years, 3 months
rhmessaging commits: r3614 - in mgmt/trunk: cumin/python/cumin/inventory and 3 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-09-03 11:33:19 -0400 (Thu, 03 Sep 2009)
New Revision: 3614
Removed:
mgmt/trunk/cumin/python/cumin/managementserver.py
mgmt/trunk/cumin/python/cumin/managementserver.strings
Modified:
mgmt/trunk/cumin/python/cumin/inventory/system.py
mgmt/trunk/cumin/python/cumin/messaging/broker.py
mgmt/trunk/cumin/python/cumin/messaging/connection.py
mgmt/trunk/cumin/python/cumin/messaging/queue.py
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/cumin/python/cumin/parameters.py
mgmt/trunk/mint/python/mint/model.py
mgmt/trunk/mint/python/mint/poll.py
mgmt/trunk/mint/python/mint/sql.py
mgmt/trunk/mint/python/mint/update.py
mgmt/trunk/wooly/python/wooly/pages.py
Log:
* BrokerRegistration is no longer used; remove it and its associated
code from the tree
* Fix a few render_title methods with residual get_args
Modified: mgmt/trunk/cumin/python/cumin/inventory/system.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/inventory/system.py 2009-09-02 14:03:11 UTC (rev 3613)
+++ mgmt/trunk/cumin/python/cumin/inventory/system.py 2009-09-03 15:33:19 UTC (rev 3614)
@@ -227,18 +227,18 @@
# using a loop instead of an sql select with an outer join
brokers = Broker.select()
+
for broker in brokers:
if broker.system.nodeName == system.nodeName:
- if broker.registrationID:
- daemons.append(BrokerRegistration.get(broker.registrationID))
+ daemons.append(broker)
return daemons
def render_item_content(self, session, item):
- if isinstance(item, BrokerRegistration):
- reg = Identifiable(item.id)
- href = self.page.main.messaging.broker.get_href(session, reg)
- return fmt_link(href, item.url)
+ if isinstance(item, Broker):
+ broker = Identifiable(item.id)
+ href = self.page.main.messaging.broker.get_href(session, broker)
+ return fmt_link(href, item.port) # XXX item.port is too lame
else:
pool = Pool(item.Pool)
self.page.main.grid.pool.object.set(session, pool)
@@ -265,7 +265,7 @@
return "Scheduler"
elif isinstance(item, Negotiator):
return "Negotiator"
- elif isinstance(item, BrokerRegistration):
+ elif isinstance(item, Broker):
return "Broker"
else:
return "Daemon"
Deleted: mgmt/trunk/cumin/python/cumin/managementserver.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/managementserver.py 2009-09-02 14:03:11 UTC (rev 3613)
+++ mgmt/trunk/cumin/python/cumin/managementserver.py 2009-09-03 15:33:19 UTC (rev 3614)
@@ -1,230 +0,0 @@
-from mint import *
-from wooly import *
-from wooly.widgets import *
-
-from widgets import *
-from parameters import *
-from formats import *
-from util import *
-
-strings = StringCatalog(__file__)
-
-class ManagementServerSet(CuminSelectionTable):
- def __init__(self, app, name):
- item = Parameter(app, "item")
- super(ManagementServerSet, self).__init__(app, name, item)
-
- col = self.NameColumn(app, "name")
- self.add_column(col)
- self.set_default_column(col)
-
- col = self.UrlColumn(app, "url")
- self.add_column(col)
-
- col = self.StatusColumn(app, "status")
- self.add_column(col)
-
- #task = main.module.xxx
- #button = TaskButton(app, "remove", task, self.selection)
- #self.buttons.add_child(button)
-
- def render_title(self, session):
- return "Managent Servers"
-
- class NameColumn(SqlTableColumn):
- def render_title(self, session, data):
- return "Name"
-
- class UrlColumn(SqlTableColumn):
- def render_title(self, session, data):
- return "URL"
-
- class StatusColumn(ItemTableColumn):
- def render_title(self, session, data):
- return "Status"
-
- def render_content(self, session, data):
- url = data["url"]
-
- try:
- server = self.app.model.mint.model.mintBrokersByUrl[url]
-
- if server.connected:
- html = "Connected"
- else:
- html = "Disconnected"
- except KeyError:
- html = "Unreachable"
-
- return html
-
-class ManagementServerSetAdd(CuminForm, Frame):
- def __init__(self, app, name):
- super(ManagementServerSetAdd, self).__init__(app, name)
-
- self.name_param = Parameter(app, "name_param");
- self.add_parameter(self.name_param)
-
- self.names = ListParameter(app, "name", self.name_param)
- self.add_parameter(self.names)
-
- self.name_errors = self.Errors(self, "name_errors")
- self.add_attribute(self.name_errors)
-
- self.addr_param = Parameter(app, "addr_param")
- self.add_parameter(self.addr_param)
-
- self.addrs = ListParameter(app, "address", self.addr_param)
- self.add_parameter(self.addrs)
-
- self.addr_errors = self.Errors(self, "addr_errors")
- self.add_attribute(self.addr_errors)
-
- self.fields = IntegerParameter(app, "fields")
- self.fields.default = 3
- self.add_parameter(self.fields)
-
- self.field_tmpl = Template(self, "field_html")
-
- self.more_button = self.MoreEntries(app, "more_button")
- self.add_child(self.more_button)
-
- self.more = self.More(app, "more")
- self.add_parameter(self.more)
-
- def process_cancel(self, session):
- branch = session.branch()
- self.frame.view.show(branch)
- self.page.set_redirect_url(session, branch.marshal())
-
- def process_submit(self, session):
- action = self.app.model.management_server.add
-
- addrs = self.addrs.get(session)
- names = self.names.get(session)
- fields = self.fields.get(session)
-
- if self.validate(session, addrs, names):
- for i in range(0, fields):
- try:
- addr = addrs[i]
- except:
- addr = None
-
- if addr:
- name = names[i]
- url = "amqp://%s:%i" % parse_broker_addr(addr)
-
- args = {"name": name, "url": url}
- reg = action.invoke(None, args)
-
- self.process_cancel(session)
-
- def validate(self, session, addrs, names):
- nerrs = self.name_errors.get(session)
- aerrs = self.addr_errors.get(session)
- fields = self.fields.get(session)
-
- for i in range(0, fields):
- try:
- addr, name = addrs[i], names[i]
- except:
- addr = name = None
-
- if not addr and not name:
- pass # It's just an empty row
- else:
- if not name:
- errs = nerrs.setdefault(i, list())
- errs.append("The name field is empty; it is required")
- elif BrokerRegistration.selectBy(name=name).count():
- errs = nerrs.setdefault(i, list())
- errs.append("A broker called '%s' already exists" % name)
-
- if not addr:
- errs = aerrs.setdefault(i, list())
- errs.append("The address field is empty; it is required")
- else:
- try:
- parse_broker_addr(addr)
- except:
- errs = aerrs.setdefault(i, list())
- errs.append("The address is malformed")
- break
-
- count = BrokerRegistration.selectBy \
- (url=addr).count()
-
- if count:
- errs = aerrs.setdefault(i, list())
- errs.append("The broker at %s " % (url) +
- "is already registered")
-
- return not len(nerrs) and not len(aerrs)
-
- def render_title(self, session):
- return "Register New Brokers"
-
- class Errors(Attribute):
- def get_default(self, session):
- return dict()
-
- class More(Parameter):
- def get_default(self, session):
- return ""
-
- def process_display(self, session):
- if self.more.get(session):
- self.fields.set(session, self.fields.get(session) + 3)
-
- def render_fields(self, session):
- writer = Writer()
-
- for i in range(self.fields.get(session)):
- self.field_tmpl.render(writer, session, i)
-
- return writer.to_string()
-
- def render_field_name_name(self, session, index):
- return self.names.path
-
- def render_field_name_value(self, session, index):
- names = self.names.get(session)
- if len(names) > index:
- return names[index]
-
- def render_field_name_errors(self, session, index):
- errors = self.name_errors.get(session)
- if index in errors:
- return "<ul class=\"errors\"><li>%s</li></ul>" % \
- "</li><li>".join(errors[index])
-
- def render_field_address_name(self, session, index):
- return self.addrs.path
-
- def render_field_address_value(self, session, index):
- addrs = self.addrs.get(session)
- if len(addrs) > index:
- return addrs[index]
-
- def render_field_address_errors(self, session, index):
- errors = self.addr_errors.get(session)
- if index in errors:
- return "<ul class=\"errors\"><li>%s</li></ul>" % \
- "</li><li>".join(errors[index])
-
- def render_more_id(self, session):
- return self.more_button.path
-
- def render_more_name(self, session):
- return self.more.path
-
- class MoreEntries(FormButton):
- def render_content(self, session):
- return "More Entries"
-
- def render_class(self, session):
- return "more"
-
- def render_type(self, session):
- return "button"
Deleted: mgmt/trunk/cumin/python/cumin/managementserver.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/managementserver.strings 2009-09-02 14:03:11 UTC (rev 3613)
+++ mgmt/trunk/cumin/python/cumin/managementserver.strings 2009-09-03 15:33:19 UTC (rev 3614)
@@ -1,178 +0,0 @@
-[ManagementServerSet.sql]
-select br.id, br.name, br.url
-from broker_registration as br
-{sql_where}
-{sql_orderby}
-{sql_limit}
-
-[ManagementServerSet.count_sql]
-select count(*)
-from broker_registration as br
-{sql_where}
-
-[ManagementServerSetAdd.css]
-table.ManagementServerSetAdd td, table.ManagementServerSetAdd th {
- padding: 0.25em;
-}
-
-table.ManagementServerSetAdd span.example {
- font-weight: normal;
- font-size: 0.8em;
- font-style: italic;
-}
-
-[ManagementServerSetAdd.html]
-<form id="{id}" class="mform" method="post" action="?">
- <div class="head">
- <h1>{title}</h1>
- </div>
- <div class="body">
- <fieldset>
- <table class="ManagementServerSetAdd">
- <tr>
- <th>Name</th>
- <th>
- Address
- <br/>
- <span class="example">Examples: example.net, example.net:5672, 172.16.82.10</span>
- </th>
- </tr>
-
- {fields}
- </table>
-
- {more_button}<input type="hidden" name="{more_name}" value="" />
- </fieldset>
-
- {hidden_inputs}
- </div>
- <div class="foot">
- {help}
- {submit}
- {cancel}
- </div>
-</form>
-<script type="text/javascript" defer="defer">
-<![CDATA[
-(function() {
- var elem = $("{id}").elements[1];
-
- elem.focus();
- elem.select();
-
- function attach_more_id() {
- var oMore = document.getElementById("{more_id}");
- if (oMore) {
- oMore.onclick = function() {
- document.forms[0].elements["{more_name}"].value = "t";
- document.forms[0].submit();
- }
- }
- }
-
- addEvent(window, "load", attach_more_id);
-}())
-]]>
-</script>
-
-[ManagementServerSetAdd.field_html]
-<tr>
- <td>
- <input type="text" name="{field_name_name}" value="{field_name_value}"
- size="15" tabindex="100"/>
- {field_name_errors}
- </td>
- <td>
- <input type="text" name="{field_address_name}" value="{field_address_value}"
- size="35" tabindex="100"/>
- {field_address_errors}
- </td>
-</tr>
-[ManagementServerSet.sql]
-select br.id, br.name, br.url
-from broker_registration as br
-{sql_where}
-{sql_orderby}
-{sql_limit}
-
-[ManagementServerSet.count_sql]
-select count(*)
-from broker_registration as br
-{sql_where}
-
-[ManagementServerSetAdd.css]
-table.ManagementServerSetAdd td, table.ManagementServerSetAdd th {
- padding: 0.25em;
-}
-
-table.ManagementServerSetAdd span.example {
- font-weight: normal;
- font-size: 0.8em;
- font-style: italic;
-}
-
-[ManagementServerSetAdd.html]
-<form id="{id}" class="mform" method="post" action="?">
- <div class="head">
- <h1>{title}</h1>
- </div>
- <div class="body">
- <fieldset>
- <table class="ManagementServerSetAdd">
- <tr>
- <th>Name</th>
- <th>
- Address
- <br/>
- <span class="example">Examples: example.net, example.net:5672, 172.16.82.10</span>
- </th>
- </tr>
-
- {fields}
- </table>
-
- {more_button}<input type="hidden" name="{more_name}" value="" />
- </fieldset>
-
- {hidden_inputs}
- </div>
- <div class="foot">
- {help}
- {submit}
- {cancel}
- </div>
-</form>
-<script type="text/javascript" defer="defer">
-<![CDATA[
-(function() {
- var elem = $("{id}").elements[1];
-
- elem.focus();
- elem.select();
-
- window.addEvent('domready', function () {
- var oMore = $("{more_id}");
- if (oMore) {
- oMore.onclick = function() {
- document.forms[0].elements["{more_name}"].value = "t";
- document.forms[0].submit();
- }
- }
- });
-}())
-]]>
-</script>
-
-[ManagementServerSetAdd.field_html]
-<tr>
- <td>
- <input type="text" name="{field_name_name}" value="{field_name_value}"
- size="15" tabindex="100"/>
- {field_name_errors}
- </td>
- <td>
- <input type="text" name="{field_address_name}" value="{field_address_value}"
- size="35" tabindex="100"/>
- {field_address_errors}
- </td>
-</tr>
Modified: mgmt/trunk/cumin/python/cumin/messaging/broker.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/messaging/broker.py 2009-09-02 14:03:11 UTC (rev 3613)
+++ mgmt/trunk/cumin/python/cumin/messaging/broker.py 2009-09-03 15:33:19 UTC (rev 3614)
@@ -320,197 +320,6 @@
return fmt_link(href, name, class_)
-class BrokerSetForm(CuminForm, Frame):
- def __init__(self, app, name):
- super(BrokerSetForm, self).__init__(app, name)
-
- self.name_param = Parameter(app, "name_param");
- self.add_parameter(self.name_param)
-
- self.names = ListParameter(app, "name", self.name_param)
- self.add_parameter(self.names)
-
- self.name_errors = self.Errors(self, "name_errors")
- self.add_attribute(self.name_errors)
-
- self.addr_param = Parameter(app, "addr_param")
- self.add_parameter(self.addr_param)
-
- self.addrs = ListParameter(app, "address", self.addr_param)
- self.add_parameter(self.addrs)
-
- self.addr_errors = self.Errors(self, "addr_errors")
- self.add_attribute(self.addr_errors)
-
- self.group_param = BrokerGroupParameter(app, "group_param")
- self.group_param.default = None
- self.add_parameter(self.group_param)
-
- self.groups = ListParameter(app, "group", self.group_param)
- self.add_parameter(self.groups)
-
- self.fields = IntegerParameter(app, "fields")
- self.fields.default = 3
- self.add_parameter(self.fields)
-
- self.field_tmpl = Template(self, "field_html")
- self.group_tmpl = Template(self, "group_html")
-
- self.more = self.MoreEntries(app, "more")
- self.add_child(self.more)
-
- class Errors(Attribute):
- def get_default(self, session):
- return dict()
-
- def process_display(self, session):
- if self.more.get(session):
- self.fields.set(session, self.fields.get(session) + 3)
-
- def render_fields(self, session):
- writer = Writer()
-
- for i in range(self.fields.get(session)):
- self.field_tmpl.render(writer, session, i)
-
- return writer.to_string()
-
- def render_field_name_name(self, session, index):
- return self.names.path
-
- def render_field_name_value(self, session, index):
- names = self.names.get(session)
- if len(names) > index:
- return escape_entity(names[index])
-
- def render_field_name_errors(self, session, index):
- errors = self.name_errors.get(session)
- if index in errors:
- return "<ul class=\"errors\"><li>%s</li></ul>" % \
- "</li><li>".join(errors[index])
-
- def render_field_address_name(self, session, index):
- return self.addrs.path
-
- def render_field_address_value(self, session, index):
- addrs = self.addrs.get(session)
- if len(addrs) > index:
- return escape_entity(addrs[index])
-
- def render_field_address_errors(self, session, index):
- errors = self.addr_errors.get(session)
- if index in errors:
- return "<ul class=\"errors\"><li>%s</li></ul>" % \
- "</li><li>".join(errors[index])
-
- def render_field_group_name(self, session, index):
- return self.groups.path
-
- def render_groups(self, session, index):
- writer = Writer()
-
- for group in BrokerGroup.select():
- self.group_tmpl.render(writer, session, index, group)
-
- return writer.to_string()
-
- def render_group_value(self, session, index, group):
- return group.id
-
- def render_group_name(self, session, index, group):
- return group.name
-
- def render_group_selected_attr(self, session, index, group):
- groups = self.groups.get(session)
- if len(groups) > index:
- if groups[index] and group.id == groups[index].id:
- return "selected=\"selected\""
-
- class MoreEntries(FormButton):
- def render_content(self, session):
- return "More Entries"
-
-class BrokerSetAdd(BrokerSetForm):
- def process_cancel(self, session):
- branch = session.branch()
- self.frame.view.show(branch)
- self.page.set_redirect_url(session, branch.marshal())
-
- def process_submit(self, session):
- action = self.app.model.broker_registration.add
-
- addrs = self.addrs.get(session)
- names = self.names.get(session)
- groups = self.groups.get(session)
- fields = self.fields.get(session)
-
- if self.validate(session, addrs, names, groups):
- for i in range(0, fields):
- try:
- addr = addrs[i]
- except:
- addr = None
-
- if addr:
- name = names[i]
- url = "amqp://%s:%i" % parse_broker_addr(addr)
-
- args = {"name": name, "url": url}
- reg = action.invoke(None, args)
-
- if len(groups) > i:
- group = groups[i]
-
- if group:
- reg.addBrokerGroup(group)
-
- self.process_cancel(session)
-
- def validate(self, session, addrs, names, groups):
- nerrs = self.name_errors.get(session)
- aerrs = self.addr_errors.get(session)
- fields = self.fields.get(session)
-
- for i in range(0, fields):
- try:
- addr, name = addrs[i], names[i]
- except:
- addr = name = None
-
- if not addr and not name:
- pass # It's just an empty row
- else:
- if not name:
- errs = nerrs.setdefault(i, list())
- errs.append("The name field is empty; it is required")
- elif BrokerRegistration.selectBy(name=name).count():
- errs = nerrs.setdefault(i, list())
- errs.append("A broker called '%s' already exists" % name)
-
- if not addr:
- errs = aerrs.setdefault(i, list())
- errs.append("The address field is empty; it is required")
- else:
- #host, port = parse_broker_addr(addr)
- count = BrokerRegistration.selectBy \
- (url=addr).count()
-
- if count:
- errs = aerrs.setdefault(i, list())
- errs.append("The broker at %s " % (addr) +
- "is already registered")
-
- return not len(nerrs) and not len(aerrs)
-
- def render_title(self, session):
- return "Register New Brokers"
-
- def process_display(self, session, reg):
- self.broker_name.set(session, reg.name)
-
- for group in reg.groups:
- self.groups.get(session).append(group)
-
class BrokerSetEngroupForm(CuminTaskForm):
def __init__(self, app, name, task):
super(BrokerSetEngroupForm, self).__init__(app, name, task)
Modified: mgmt/trunk/cumin/python/cumin/messaging/connection.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/messaging/connection.py 2009-09-02 14:03:11 UTC (rev 3613)
+++ mgmt/trunk/cumin/python/cumin/messaging/connection.py 2009-09-03 15:33:19 UTC (rev 3614)
@@ -189,7 +189,7 @@
return "Statistics"
class SendReceiveRateChart(StatFlashChart):
- def render_title(self, session, conn):
+ def render_title(self, session):
return "Bytes sent and received"
class SessionSetTaskForm(CuminTaskForm):
Modified: mgmt/trunk/cumin/python/cumin/messaging/queue.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/messaging/queue.py 2009-09-02 14:03:11 UTC (rev 3613)
+++ mgmt/trunk/cumin/python/cumin/messaging/queue.py 2009-09-03 15:33:19 UTC (rev 3614)
@@ -711,7 +711,7 @@
return "Durability"
class EnqueueDequeueRateChart(StatFlashChart):
- def render_title(self, session, queue):
+ def render_title(self, session):
return "Durable messages enqueued and eequeued"
class JournalStats(StatSet):
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2009-09-02 14:03:11 UTC (rev 3613)
+++ mgmt/trunk/cumin/python/cumin/model.py 2009-09-03 15:33:19 UTC (rev 3614)
@@ -57,7 +57,6 @@
CuminBrokerAclModule(self)
CuminBrokerClusterModule(self)
- CuminManagementServer(self)
CuminBrokerGroup(self)
# Grid
@@ -1735,26 +1734,6 @@
prop = CuminProperty(self, "members")
prop.title = "Members"
-class CuminManagementServer(CuminClass):
- def __init__(self, model):
- super(CuminManagementServer, self).__init__ \
- (model, "management_server", BrokerRegistration)
-
- prop = CuminProperty(self, "url")
- prop.title = "URL"
- prop.summary = True
-
- def init(self):
- super(CuminManagementServer, self).init()
-
- self.frame = self.model.frame.messaging.broker # XXX
-
- def get_title(self, session):
- return "Management Server"
-
- def get_icon_href(self, session):
- return "resource?name=broker-36.png"
-
class CuminBrokerGroup(CuminClass):
def __init__(self, model):
super(CuminBrokerGroup, self).__init__ \
Modified: mgmt/trunk/cumin/python/cumin/parameters.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/parameters.py 2009-09-02 14:03:11 UTC (rev 3613)
+++ mgmt/trunk/cumin/python/cumin/parameters.py 2009-09-03 15:33:19 UTC (rev 3614)
@@ -65,13 +65,6 @@
for vhost in Vhost.selectBy(broker=broker, name="/"):
return vhost
-class BrokerRegistrationParameter(Parameter):
- def do_unmarshal(self, string):
- return BrokerRegistration.get(int(string))
-
- def do_marshal(self, broker):
- return str(broker.id)
-
class ConnectionParameter(Parameter):
def do_unmarshal(self, string):
return ClientConnection.get(int(string))
Modified: mgmt/trunk/mint/python/mint/model.py
===================================================================
--- mgmt/trunk/mint/python/mint/model.py 2009-09-02 14:03:11 UTC (rev 3613)
+++ mgmt/trunk/mint/python/mint/model.py 2009-09-03 15:33:19 UTC (rev 3614)
@@ -33,10 +33,6 @@
except TypeError:
pass
-Broker.sqlmeta.addColumn(ForeignKey("BrokerRegistration",
- cascade="null", default=None,
- name="registration"))
-
class Subject(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -98,26 +94,6 @@
name = StringCol(length=1000, default=None)
collectorId = StringCol(length=1000, default=None)
-class BrokerRegistration(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(length=1000, default=None, unique=True, notNone=True)
- url = StringCol(length=1000, default=None)
- broker = ForeignKey("Broker", cascade="null", default=None)
-
- url_unique = DatabaseIndex(url, unique=True)
-
- def getBrokerId(self):
- return self.mintBroker.qmfId
-
- def getDefaultVhost(self):
- if self.broker:
- try:
- return Vhost.selectBy(broker=self.broker, name="/")[0]
- except IndexError:
- return None
-
class BrokerGroup(SQLObject):
class sqlmeta:
lazyUpdate = True
Modified: mgmt/trunk/mint/python/mint/poll.py
===================================================================
--- mgmt/trunk/mint/python/mint/poll.py 2009-09-02 14:03:11 UTC (rev 3613)
+++ mgmt/trunk/mint/python/mint/poll.py 2009-09-03 15:33:19 UTC (rev 3614)
@@ -1,7 +1,6 @@
import logging
from time import sleep
from psycopg2 import OperationalError
-from mint.model import BrokerRegistration
from util import MintDaemonThread
@@ -29,18 +28,4 @@
sleep(self.interval)
def do_run(self):
- regUrls = set()
-
- for reg in BrokerRegistration.select():
- if reg.url not in self.app.model.mintBrokersByUrl:
- try:
- self.app.model.addBroker(reg.url)
- except socket.error, e:
- log.info("Can't connect to broker at %s: %s", reg.url, e)
-
- regUrls.add(reg.url)
-
- for mbroker in self.app.model.mintBrokersByQmfBroker.values():
- if mbroker.url not in regUrls:
- self.app.model.delBroker(mbroker)
-
+ pass
Modified: mgmt/trunk/mint/python/mint/sql.py
===================================================================
--- mgmt/trunk/mint/python/mint/sql.py 2009-09-02 14:03:11 UTC (rev 3613)
+++ mgmt/trunk/mint/python/mint/sql.py 2009-09-03 15:33:19 UTC (rev 3614)
@@ -170,31 +170,6 @@
return sql
-class SqlGetBrokerRegistration(SqlOperation):
- def __init__(self):
- super(SqlGetBrokerRegistration, self).__init__("get_broker_reg")
-
- def generate(self):
- return """
- select id
- from broker_registration
- where url = %(url)s
- """
-
-class SqlAttachBroker(SqlOperation):
- def __init__(self):
- super(SqlAttachBroker, self).__init__("attach_broker")
-
- def generate(self):
- return """
- update broker_registration
- set broker_id = %(id)s
- where id = %(registrationId)s;
- update broker
- set registration_id = %(registrationId)s
- where id = %(id)s
- """
-
class SqlProfile(object):
def __init__(self):
self.ops = list()
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2009-09-02 14:03:11 UTC (rev 3613)
+++ mgmt/trunk/mint/python/mint/update.py 2009-09-03 15:33:19 UTC (rev 3614)
@@ -286,9 +286,6 @@
assert cursor.rowcount == 1
self.broker.objectDatabaseIds.set(oid, id)
-
- if cls is Broker:
- self.processBroker(cursor, id)
else:
# Case 3
@@ -313,25 +310,6 @@
thread.propUpdateCount += 1
- def processBroker(self, cursor, id):
- if self.broker.databaseId is None:
- op = SqlGetBrokerRegistration()
- op.execute(cursor, {"url": self.broker.url})
-
- rec = cursor.fetchone()
-
- #print op.text, {"url": self.broker.url}
-
- if rec:
- rid = rec[0]
-
- op = SqlAttachBroker()
- op.execute(cursor, {"id": id, "registrationId": rid})
-
- #print op.text, {"id": id, "registrationId": rid}
-
- self.broker.databaseId = id
-
class StatisticUpdate(ModelUpdate):
def process(self, thread):
try:
Modified: mgmt/trunk/wooly/python/wooly/pages.py
===================================================================
--- mgmt/trunk/wooly/python/wooly/pages.py 2009-09-02 14:03:11 UTC (rev 3613)
+++ mgmt/trunk/wooly/python/wooly/pages.py 2009-09-03 15:33:19 UTC (rev 3614)
@@ -234,8 +234,12 @@
def get(self, session):
sess = super(UpdatePage.SessionParameter, self).get(session)
- sess.user_session = session.user_session
+
+ if hasattr(session, "user_session"):
+ sess.user_session = session.user_session
+
sess.background = True
+
return sess
class WidgetParameter(Parameter):
15 years, 3 months