rhmessaging commits: r1731 - mgmt/cumin/bin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-02-27 15:10:18 -0500 (Wed, 27 Feb 2008)
New Revision: 1731
Modified:
mgmt/cumin/bin/cumin-admin
Log:
Check that the user name is unique
Modified: mgmt/cumin/bin/cumin-admin
===================================================================
--- mgmt/cumin/bin/cumin-admin 2008-02-27 19:37:37 UTC (rev 1730)
+++ mgmt/cumin/bin/cumin-admin 2008-02-27 20:10:18 UTC (rev 1731)
@@ -7,6 +7,7 @@
from random import sample
from crypt import crypt
from mint import MintDatabase, ConsoleUser
+from psycopg2 import IntegrityError
from cumin.util import *
@@ -90,15 +91,22 @@
print "Error: no user name given"
usage()
+ name = args[1]
+
+ if ConsoleUser.selectBy(name=name).count():
+ print "Error: a user called '%s' already exists" % name
+ sys.exit(1)
+
password = getpass()
-
chs = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
- ch0, ch1 = sample(chs, 2)
+ crypted = crypt(password, "".join(sample(chs, 2)))
- user = ConsoleUser()
- user.name = args[1]
- user.password = crypt(password, ch0 + ch1)
- user.syncUpdate()
+ try:
+ ConsoleUser(name=name, password=crypted)
+ except IntegrityError:
+ print "Error: a user called '%s' already exists" % name
+ sys.exit(1)
+
elif command == "remove-user":
if force:
if len(args) != 2:
16 years, 10 months
rhmessaging commits: r1730 - in mgmt: notes and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-02-27 14:37:37 -0500 (Wed, 27 Feb 2008)
New Revision: 1730
Modified:
mgmt/cumin/python/cumin/broker.strings
mgmt/notes/justin-todo.txt
Log:
Add examples to the host field of the broker add form.
Modified: mgmt/cumin/python/cumin/broker.strings
===================================================================
--- mgmt/cumin/python/cumin/broker.strings 2008-02-27 15:54:01 UTC (rev 1729)
+++ mgmt/cumin/python/cumin/broker.strings 2008-02-27 19:37:37 UTC (rev 1730)
@@ -207,6 +207,12 @@
padding: 0.25em;
}
+table.BrokerSetForm span.example {
+ font-weight: normal;
+ font-size: 0.8em;
+ font-style: italic;
+}
+
[BrokerSetForm.html]
<form id="{id}" class="mform" method="post" action="?">
<div class="head">
@@ -217,7 +223,11 @@
<table class="BrokerSetForm">
<tr>
<th>Name</th>
- <th>Host Name or IP Address</th>
+ <th>
+ Domain Name or IP Address
+ <br/>
+ <span class="example">Examples: example.net, example.net:5762, 172.16.82.10</span>
+ </th>
<th>Initial Group</th>
</tr>
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2008-02-27 15:54:01 UTC (rev 1729)
+++ mgmt/notes/justin-todo.txt 2008-02-27 19:37:37 UTC (rev 1730)
@@ -1,9 +1,5 @@
Current
- * Add simple user auth
-
- * Move to cherrypy web server
-
* Bulk actions: Add javascript for the check-all behavior
* Tables: Add sort direction icon
@@ -12,8 +8,6 @@
* Get rid of single object confirms
- * Add examples of legit broker addresses in broker add form
-
* Add broker reg name unique constraint and validation
* Add unique constraint to user name, and deal with it in cumin-admin
16 years, 10 months
rhmessaging commits: r1729 - in store/trunk/cpp: tests and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2008-02-27 10:54:01 -0500 (Wed, 27 Feb 2008)
New Revision: 1729
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/tests/persistence.py
store/trunk/cpp/tests/system_test.sh
Log:
Fix to delete bindings when queue is deleted
Fix to remove stale messages on recovery
Extra tests
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-02-26 17:09:41 UTC (rev 1728)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-02-27 15:54:01 UTC (rev 1729)
@@ -269,6 +269,7 @@
{
checkInit();
destroy(queueDb, queue);
+ deleteBindingsForQueue(queue);
qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
if (eqs)
{
@@ -501,6 +502,7 @@
exchange->second->bind(queueName, routingkey, args);
} else {
//stale binding, delete it
+ QPID_LOG(warning, "Deleting stale binding");
bindings->del(0);
}
}
@@ -675,10 +677,11 @@
int count(0);
for (int status = mappings->get(&msgId, &value, DB_SET); status == 0; status = mappings->get(&msgId, &value, DB_NEXT_DUP)) {
- RecoverableQueue::shared_ptr queue = index[value.id];
- if (!queue) {
+ if (index.find(value.id) == index.end()) {
QPID_LOG(warning, "Recovered message for queue that no longer exists");
+ mappings->del(0);
} else {
+ RecoverableQueue::shared_ptr queue = index[value.id];
if (PreparedTransaction::isLocked(locked, value.id, msgId.id)) {
prepared[msgId.id] = msg;
} else {
@@ -1334,6 +1337,34 @@
}
}
+void BdbMessageStore::deleteBindingsForQueue(const PersistableQueue& queue)
+{
+ TxnCtxt txn;
+ txn.begin(env, true);
+ try {
+ Cursor bindings;
+ bindings.open(bindingDb, txn.get());
+
+ IdDbt key;
+ Dbt value;
+ while (bindings.next(key, value)) {
+ Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+ if (buffer.available() < 8) {
+ THROW_STORE_EXCEPTION("Not enough data for binding");
+ }
+ uint64_t queueId = buffer.getLongLong();
+ if (queue.getPersistenceId() == queueId) {
+ bindings->del(0);
+ QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
+ }
+ }
+ } catch (const std::exception& e) {
+ THROW_STORE_EXCEPTION_2("Error deleting bindings", e.what());
+ }
+ txn.commit();
+ QPID_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId());
+}
+
string BdbMessageStore::getJrnlBaseDir()
{
std::stringstream dir;
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-02-26 17:09:41 UTC (rev 1728)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-02-27 15:54:01 UTC (rev 1729)
@@ -122,6 +122,7 @@
bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable& p);
void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit);
void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId);
+ void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
u_int64_t getRecordSize(Db& db, Dbt& key);
u_int64_t getRecordSize(DbTxn* txn, Db& db, Dbt& key);
Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py 2008-02-26 17:09:41 UTC (rev 1728)
+++ store/trunk/cpp/tests/persistence.py 2008-02-27 15:54:01 UTC (rev 1729)
@@ -247,7 +247,55 @@
else:
self.assertEqual(0, channel.queue_query(queue=q).message_count)
+ def phase7(self):
+ channel = self.channel
+ channel.synchronous = False
+ #test deletion of queue after publish
+ #create queue
+ channel.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+ #send message
+ for i in range(1, 10):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
+
+ channel.synchronous = True
+ #explicitly delete queue
+ channel.queue_delete(queue = "q")
+
+ #test acking of message from auto-deleted queue
+ #create queue
+ channel.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+ #send message
+ channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
+
+ #create consumer
+ channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=0)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ channel.message_flow(unit = 0, value = 10, destination = "a")
+ queue = self.client.queue("a")
+
+ #consume the message, cancel subscription (triggering auto-delete), then ack it
+ msg = queue.get(timeout = 5)
+ channel.message_cancel(destination = "a")
+ msg.complete()
+
+ #test implicit deletion of bindings when queue is deleted
+ channel.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
+ channel.queue_bind(exchange="amq.topic", queue="durable-subscriber-queue", routing_key="xyz")
+ channel.message_transfer(destination= "amq.topic", content=Content(properties={'routing_key' : "xyz", 'delivery_mode':2}, body = "my-message"))
+ channel.queue_delete(queue = "durable-subscriber-queue")
+
+ def phase8(self):
+ channel = self.channel
+
+ channel.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
+ channel.queue_bind(exchange="amq.topic", queue="durable-subscriber-queue", routing_key="xyz")
+ channel.message_transfer(destination= "amq.topic", content=Content(properties={'routing_key' : "xyz", 'delivery_mode':2}, body = "my-message"))
+ channel.queue_delete(queue = "durable-subscriber-queue")
+
+
def xid(self, txid, branchqual = ''):
return pack('!LBB', 0, len(txid), len(branchqual)) + txid + branchqual
Modified: store/trunk/cpp/tests/system_test.sh
===================================================================
--- store/trunk/cpp/tests/system_test.sh 2008-02-26 17:09:41 UTC (rev 1728)
+++ store/trunk/cpp/tests/system_test.sh 2008-02-27 15:54:01 UTC (rev 1729)
@@ -72,7 +72,7 @@
mode='bdb'
echo 'BDB persistence...'
fi
- for p in `seq 1 6`; do
+ for p in `seq 1 8`; do
log="$abs_srcdir/vg-log.$mode.$p"
#echo "$vg $QPIDD -m 0 --data dir "" --load-module $LIBBDBSTORE $JRNLFLAGS"
$vg $QPIDD -m 0 --data-dir "" --load-module $LIBBDBSTORE $JRNLFLAGS >> "$abs_srcdir/qpid.log" 2> $log & pid=$!
16 years, 10 months
rhmessaging commits: r1728 - in mgmt: cumin/etc and 2 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-02-26 12:09:41 -0500 (Tue, 26 Feb 2008)
New Revision: 1728
Added:
mgmt/cumin-test-0/etc/cumin.crt
mgmt/cumin-test-0/etc/cumin.key
mgmt/cumin/etc/cumin.crt
mgmt/cumin/etc/cumin.key
Modified:
mgmt/cumin/bin/cumin
mgmt/cumin/python/wooly/server.py
Log:
Improves the configuration of ssl certs and keys.
Adds a self-signed dummy cert that's only good for demo and
development purposes.
Modified: mgmt/cumin/bin/cumin
===================================================================
--- mgmt/cumin/bin/cumin 2008-02-26 16:42:17 UTC (rev 1727)
+++ mgmt/cumin/bin/cumin 2008-02-26 17:09:41 UTC (rev 1728)
@@ -35,6 +35,15 @@
host = socket.gethostname()
server = CuminServer(app, host, port)
+ cpath = os.path.join(home, "etc", "cumin.crt")
+ kpath = os.path.join(home, "etc", "cumin.key")
+
+ if os.path.isfile(cpath):
+ server.set_ssl_cert_path(cpath)
+
+ if os.path.isfile(kpath):
+ server.set_ssl_key_path(kpath)
+
try:
server.start()
except:
Added: mgmt/cumin/etc/cumin.crt
===================================================================
--- mgmt/cumin/etc/cumin.crt (rev 0)
+++ mgmt/cumin/etc/cumin.crt 2008-02-26 17:09:41 UTC (rev 1728)
@@ -0,0 +1,40 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIICXQIBAAKBgQC8zL7h6pevyBeme9LMDvCKEBOzKuBMr4QUsnsBdKa+DO21wZWd
+X3h/GKhV7ELgl7j/G4lZVYyy8jbFa7YlEawrTucryIv6VVCI4nKnw6KuclHP0UTm
+niENl6grDqu3s7cEom02nQTjtRAVn/2Tx+VO5b9M6u+4SNm686DdvH/7bwIDAQAB
+AoGBAJv4VVUTVX6dQZSCxLFZQjP8dhnUuPoBIY9rpxybvshQq+w0YaJh5Yyx1Rcu
+mE0samMt33jSqOqIGOFdS1syT1UFwdfN2PjoPCdwlkEefN1yOqlU3IYUs0AcU8Vt
++HZfR5nCZl0lzV8bHVTOhkYBHvsTKT1RTqd6A6gPD2WMkt4xAkEA+SdLRmxigF5i
+1E02C2D3REVgdMG/8/s+y03QD1/eYb0T8H6RsUqdPbz7TfLdjBZSGe2Lqwaluq1E
+sc4ZDEhnlwJBAMH84oKGwO4XWTVQIPqZ0yNwAyi32Wk8FsHH484beJjvexclyysC
+kDoNURq1tP4wLInAs7gYzoBLkw3sHBCCRekCQF3DCtHywdLbMkgA0oDPud3oWdc2
+fp07hLMPkmfmxRJ1DEbp6rwUfz4Xm18HMZahY748Jd5zvvSUdhCPpd94c/sCQGwp
+ES6P7fCSyiWkqM6xh/0BTnGcmLdJTrL7PfREcuhq1qngY7h2dG450Jfg6qqzt5GT
+BWHHSxRTjwm7NSarTEkCQQCzFsf/s6imADbjw17bkIwdcyIO3LC9KZzeXwvX9zjR
+ZkOMh3I9h4EYW9gxx6K3Hdsc59FaFN5bDM63nGTxHEzG
+-----END RSA PRIVATE KEY-----
+
+-----BEGIN CERTIFICATE-----
+MIIEHDCCA4WgAwIBAgIJALFw4Q2OWzQ4MA0GCSqGSIb3DQEBBQUAMIG7MQswCQYD
+VQQGEwItLTESMBAGA1UECBMJU29tZVN0YXRlMREwDwYDVQQHEwhTb21lQ2l0eTEZ
+MBcGA1UEChMQU29tZU9yZ2FuaXphdGlvbjEfMB0GA1UECxMWU29tZU9yZ2FuaXph
+dGlvbmFsVW5pdDEeMBwGA1UEAxMVbG9jYWxob3N0LmxvY2FsZG9tYWluMSkwJwYJ
+KoZIhvcNAQkBFhpyb290QGxvY2FsaG9zdC5sb2NhbGRvbWFpbjAeFw0wODAyMjYx
+NjUwMzlaFw0wOTAyMjUxNjUwMzlaMIG7MQswCQYDVQQGEwItLTESMBAGA1UECBMJ
+U29tZVN0YXRlMREwDwYDVQQHEwhTb21lQ2l0eTEZMBcGA1UEChMQU29tZU9yZ2Fu
+aXphdGlvbjEfMB0GA1UECxMWU29tZU9yZ2FuaXphdGlvbmFsVW5pdDEeMBwGA1UE
+AxMVbG9jYWxob3N0LmxvY2FsZG9tYWluMSkwJwYJKoZIhvcNAQkBFhpyb290QGxv
+Y2FsaG9zdC5sb2NhbGRvbWFpbjCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA
+vMy+4eqXr8gXpnvSzA7wihATsyrgTK+EFLJ7AXSmvgzttcGVnV94fxioVexC4Je4
+/xuJWVWMsvI2xWu2JRGsK07nK8iL+lVQiOJyp8OirnJRz9FE5p4hDZeoKw6rt7O3
+BKJtNp0E47UQFZ/9k8flTuW/TOrvuEjZuvOg3bx/+28CAwEAAaOCASQwggEgMB0G
+A1UdDgQWBBSG2ASHUAYMHesFOdIuwEP2JXLRETCB8AYDVR0jBIHoMIHlgBSG2ASH
+UAYMHesFOdIuwEP2JXLREaGBwaSBvjCBuzELMAkGA1UEBhMCLS0xEjAQBgNVBAgT
+CVNvbWVTdGF0ZTERMA8GA1UEBxMIU29tZUNpdHkxGTAXBgNVBAoTEFNvbWVPcmdh
+bml6YXRpb24xHzAdBgNVBAsTFlNvbWVPcmdhbml6YXRpb25hbFVuaXQxHjAcBgNV
+BAMTFWxvY2FsaG9zdC5sb2NhbGRvbWFpbjEpMCcGCSqGSIb3DQEJARYacm9vdEBs
+b2NhbGhvc3QubG9jYWxkb21haW6CCQCxcOENjls0ODAMBgNVHRMEBTADAQH/MA0G
+CSqGSIb3DQEBBQUAA4GBABAhLdp86AYqfd7g1ICfeXyQK9rAvg6dAzgND6PFE0mc
+8aBYGT7vkFFv97cxxfjtDlRpjfsxdIlKxeZZ2JpP0pW6DUdn243CUZ+4g0AYSUs6
+OboH5gkG03eJGYT1w8+7F13DQFAEvEwAtf28xItlH3Y4YeXU3Z5U/mpY+Rh6vlxk
+-----END CERTIFICATE-----
Added: mgmt/cumin/etc/cumin.key
===================================================================
--- mgmt/cumin/etc/cumin.key (rev 0)
+++ mgmt/cumin/etc/cumin.key 2008-02-26 17:09:41 UTC (rev 1728)
@@ -0,0 +1 @@
+link cumin.crt
\ No newline at end of file
Property changes on: mgmt/cumin/etc/cumin.key
___________________________________________________________________
Name: svn:special
+ *
Modified: mgmt/cumin/python/wooly/server.py
===================================================================
--- mgmt/cumin/python/wooly/server.py 2008-02-26 16:42:17 UTC (rev 1727)
+++ mgmt/cumin/python/wooly/server.py 2008-02-26 17:09:41 UTC (rev 1728)
@@ -18,9 +18,13 @@
addr = (self.host, self.port)
apps = [("", self.service)]
self.__server = CherryPyWSGIServer(addr, apps)
- self.__server.ssl_certificate = None # "/tmp/localhost.crt"
- self.__server.ssl_private_key = None # "/tmp/localhost.key"
+ def set_ssl_cert_path(self, path):
+ self.__server.ssl_certificate = path
+
+ def set_ssl_key_path(self, path):
+ self.__server.ssl_private_key = path
+
def start(self):
self.__server.start()
Added: mgmt/cumin-test-0/etc/cumin.crt
===================================================================
--- mgmt/cumin-test-0/etc/cumin.crt (rev 0)
+++ mgmt/cumin-test-0/etc/cumin.crt 2008-02-26 17:09:41 UTC (rev 1728)
@@ -0,0 +1 @@
+link ../../cumin/etc/cumin.crt
\ No newline at end of file
Property changes on: mgmt/cumin-test-0/etc/cumin.crt
___________________________________________________________________
Name: svn:special
+ *
Added: mgmt/cumin-test-0/etc/cumin.key
===================================================================
--- mgmt/cumin-test-0/etc/cumin.key (rev 0)
+++ mgmt/cumin-test-0/etc/cumin.key 2008-02-26 17:09:41 UTC (rev 1728)
@@ -0,0 +1 @@
+link ../../cumin/etc/cumin.key
\ No newline at end of file
Property changes on: mgmt/cumin-test-0/etc/cumin.key
___________________________________________________________________
Name: svn:special
+ *
16 years, 10 months
rhmessaging commits: r1727 - in mgmt: cumin/python/cumin and 2 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-02-26 11:42:17 -0500 (Tue, 26 Feb 2008)
New Revision: 1727
Added:
mgmt/cumin/bin/cumin-admin
Removed:
mgmt/cumin/bin/cumin-database
Modified:
mgmt/cumin/bin/cumin
mgmt/cumin/python/cumin/__init__.py
mgmt/cumin/python/cumin/page.py
mgmt/cumin/python/cumin/page.strings
mgmt/cumin/python/wooly/__init__.py
mgmt/cumin/python/wooly/server.py
mgmt/notes/justin-todo.txt
Log:
Adds user auth to cumin.
Introduces a CuminServer class that performs the authorization.
Replaces cumin-database with a new tool, cumin-admin, that now does
both schema operations and adds and removes users.
Restores the current user UI (for real this time) and implements log
out.
Modified: mgmt/cumin/bin/cumin
===================================================================
--- mgmt/cumin/bin/cumin 2008-02-26 16:34:01 UTC (rev 1726)
+++ mgmt/cumin/bin/cumin 2008-02-26 16:42:17 UTC (rev 1727)
@@ -1,7 +1,6 @@
#!/usr/bin/env python
-import sys, os
-from wooly.server import WebServer
+import sys, os, socket
from cumin import *
from cumin.util import *
@@ -33,12 +32,11 @@
app.init()
- server = WebServer(app, port)
+ host = socket.gethostname()
+ server = CuminServer(app, host, port)
try:
server.start()
- except KeyboardInterrupt:
- server.stop()
except:
server.stop()
raise
@@ -70,4 +68,7 @@
do_main(home, data, port, debug)
if __name__ == "__main__":
- main()
+ try:
+ main()
+ except KeyboardInterrupt:
+ pass
Added: mgmt/cumin/bin/cumin-admin
===================================================================
--- mgmt/cumin/bin/cumin-admin (rev 0)
+++ mgmt/cumin/bin/cumin-admin 2008-02-26 16:42:17 UTC (rev 1727)
@@ -0,0 +1,139 @@
+#!/usr/bin/env python
+
+import sys, os
+from sqlobject import connectionForURI
+from traceback import print_exc
+from getpass import getpass
+from random import sample
+from crypt import crypt
+from mint import MintDatabase, ConsoleUser
+
+from cumin.util import *
+
+def usage():
+ print """Usage: cumin-admin [OPTIONS...] COMMAND
+Options:
+ -h, --help Print this message
+ --data URL Connect to database at URL
+ (default postgesql://cumin@localhost/cumin)
+ --force yes Don't complain and just do it
+Commands:
+ create-schema Create the database schema
+ drop-schema Drop the database schema; requires "--force yes"
+
+ add-user NAME Add user called NAME
+ remove-user NAME Remove user called NAME; requires "--force yes"
+ list-users List existing users
+"""
+ sys.exit(1)
+
+def parse_command_args():
+ args = list()
+ parg = ""
+
+ for arg in sys.argv[1:]:
+ if not arg.startswith("--") and not parg.startswith("--"):
+ args.append(arg)
+
+ parg = arg
+
+ return args
+
+def main():
+ if "-h" in sys.argv or "--help" in sys.argv:
+ usage()
+
+ home = os.environ.get("CUMIN_HOME")
+
+ if not home:
+ home = os.path.normpath("/usr/share/cumin")
+
+ config = Config()
+ config.add_param("data", "s", "postgresql://cumin@localhost/cumin")
+ config.add_param("force", "b", False)
+
+ config.load_file(os.path.join(home, "etc", "cumin.conf"))
+ config.load_file(os.path.join(os.path.expanduser("~"), ".cumin.conf"))
+ config.load_args(sys.argv)
+
+ config.prt()
+
+ home = os.environ["CUMIN_HOME"]
+ data = config.get("data")
+ force = config.get("force")
+
+ args = parse_command_args()
+
+ if not args:
+ print "Error: no command found"
+ usage()
+
+ command = args[0]
+
+ database = MintDatabase(data)
+ database.check()
+ database.init()
+
+ if command == "create-schema":
+ main = os.path.join(home, "sql", "schema.sql")
+ indexes = os.path.join(home, "sql", "indexes.sql")
+
+ database.createSchema((main, indexes))
+ elif command == "drop-schema":
+ if force:
+ database.dropSchema()
+ else:
+ print "Error: command create-schema requires --force yes"
+ usage()
+ elif command == "add-user":
+ if len(args) != 2:
+ print "Error: no user name given"
+ usage()
+
+ password = getpass()
+
+ chs = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
+ ch0, ch1 = sample(chs, 2)
+
+ user = ConsoleUser()
+ user.name = args[1]
+ user.password = crypt(password, ch0 + ch1)
+ user.syncUpdate()
+ elif command == "remove-user":
+ if force:
+ if len(args) != 2:
+ print "Error: no user name given"
+ usage()
+
+ accounts = ConsoleUser.selectBy(name=args[1])
+
+ if accounts.count():
+ for account in accounts:
+ account.destroySelf()
+ return
+ else:
+ print "Error: no such user '%s'" % args[1]
+ else:
+ print "Error: command remove-user requires --force yes"
+ usage()
+
+ elif command == "list-users":
+ accounts = ConsoleUser.select(orderBy='name')
+
+ for account in accounts:
+ print "%4i %s" % (account.id, account.name)
+
+ count = accounts.count()
+ print "(%i user%s found)" % (count, ess(count))
+ else:
+ print "Error: command '%s' not recognized" % command
+ usage()
+
+if __name__ == "__main__":
+ try:
+ main()
+ except SystemExit, e:
+ raise e
+ except:
+ print_exc()
+ sys.exit(1)
Property changes on: mgmt/cumin/bin/cumin-admin
___________________________________________________________________
Name: svn:executable
+ *
Deleted: mgmt/cumin/bin/cumin-database
===================================================================
--- mgmt/cumin/bin/cumin-database 2008-02-26 16:34:01 UTC (rev 1726)
+++ mgmt/cumin/bin/cumin-database 2008-02-26 16:42:17 UTC (rev 1727)
@@ -1,90 +0,0 @@
-#!/usr/bin/env python
-
-import sys, os
-from sqlobject import connectionForURI
-from traceback import print_exc
-from mint import MintDatabase
-
-from cumin.util import *
-
-def usage():
- print """Usage: cumin-database [OPTIONS...] COMMAND
-Options:
- -h, --help Print this message
- --data URL Connect to database at URL
- (default postgesql://cumin@localhost/cumin)
- --force yes Don't complain and just do it
-Commands:
- create-schema Create the database schema; requires --force
- drop-schema Drop the database schema; requires --force
-"""
- sys.exit(1)
-
-def parse_command():
- parg = ""
-
- for arg in sys.argv[1:]:
- if not arg.startswith("--") and not parg.startswith("--"):
- return arg
-
- parg = arg
-
-def main():
- if "-h" in sys.argv or "--help" in sys.argv:
- usage()
-
- home = os.environ.get("CUMIN_HOME")
-
- if not home:
- home = os.path.normpath("/usr/share/cumin")
-
- config = Config()
- config.add_param("data", "s", "postgresql://cumin@localhost/cumin")
- config.add_param("force", "b", False)
-
- config.load_file(os.path.join(home, "etc", "cumin.conf"))
- config.load_file(os.path.join(os.path.expanduser("~"), ".cumin.conf"))
- config.load_args(sys.argv)
-
- config.prt()
-
- home = os.environ["CUMIN_HOME"]
- data = config.get("data")
- force = config.get("force")
-
- command = parse_command()
-
- if command is None:
- print "Error: no command found"
- usage()
-
- database = MintDatabase(data)
- database.checkConnection()
-
- if command == "create-schema":
- if force:
- main = os.path.join(home, "sql", "schema.sql")
- indexes = os.path.join(home, "sql", "indexes.sql")
-
- database.createSchema((main, indexes))
- else:
- print "Command create-schema requires --force yes"
- sys.exit(1)
- elif command == "drop-schema":
- if force:
- database.dropSchema()
- else:
- print "Command create-schema requires --force yes"
- sys.exit(1)
- else:
- print "Error: command '%s' not recognized" % command
- usage()
-
-if __name__ == "__main__":
- try:
- main()
- except SystemExit, e:
- raise e
- except:
- print_exc()
- sys.exit(1)
Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py 2008-02-26 16:34:01 UTC (rev 1726)
+++ mgmt/cumin/python/cumin/__init__.py 2008-02-26 16:42:17 UTC (rev 1727)
@@ -1,4 +1,4 @@
-import sys, os, socket
+import sys, os
from random import randint
from wooly import Application, Session, Page
@@ -10,6 +10,8 @@
from time import sleep
from threading import Thread, Event
from traceback import print_exc
+from base64 import decodestring
+from crypt import crypt
from model import CuminModel, ModelPage
from demo import DemoData
@@ -90,3 +92,35 @@
reg.connect(self.model.data)
self.event.wait(10)
+
+class CuminServer(WebServer):
+ def authorized(self, session):
+ name = session.credentials.get("name")
+ password = session.credentials.get("password")
+
+ if name:
+ try:
+ user = ConsoleUser.selectBy(name=name)[0]
+ except IndexError:
+ return False
+
+ if user:
+ crypted = user.password
+
+ if crypted and crypt(password, crypted) == crypted:
+ lch = user.lastChallenged
+
+ if lch:
+ timeout = timedelta(seconds=3600)
+ now = datetime.now()
+
+ if now - lch < timeout:
+ lout = user.lastLoggedOut
+
+ if lout is None or lout < lch:
+ return True
+
+ user.lastChallenged = datetime.now()
+ user.syncUpdate()
+
+ return False
Modified: mgmt/cumin/python/cumin/page.py
===================================================================
--- mgmt/cumin/python/cumin/page.py 2008-02-26 16:34:01 UTC (rev 1726)
+++ mgmt/cumin/python/cumin/page.py 2008-02-26 16:42:17 UTC (rev 1727)
@@ -1,6 +1,7 @@
from wooly import *
from wooly.widgets import *
from wooly.resources import *
+from time import sleep
from broker import *
from brokergroup import *
@@ -58,6 +59,9 @@
def __init__(self, app, name):
super(MainFrame, self).__init__(app, name)
+ self.__logout = BooleanParameter(app, "logout")
+ self.add_parameter(self.__logout)
+
self.__frame_tmpl = Template(self, "frame_html")
self.__tabs = MainFrameTabs(app, "tabs")
@@ -95,6 +99,14 @@
def get_title(self, session, object):
return "Main"
+ def render_user_name(self, session, object):
+ return session.credentials.get("name")
+
+ def render_logout_href(self, session, object):
+ branch = session.branch()
+ self.__logout.set(branch, True)
+ return branch.marshal()
+
def render_frames(self, session, object):
self.__object.set(session, object)
writer = Writer()
@@ -163,6 +175,31 @@
frame.set_object(session, system)
return self.page().set_current_frame(session, frame)
+ def do_process(self, session, object):
+ if self.__logout.get(session):
+ self.__logout.set(session, False)
+
+ name = session.credentials.get("name")
+
+ try:
+ user = ConsoleUser.selectBy(name=name)[0]
+ except IndexError:
+ pass
+
+ if user:
+ user.lastLoggedOut = datetime.now()
+ user.syncUpdate()
+
+ # XXX boy, this sucks. necessary because the
+ # resolution of TimestampCol (lastLoggedOut) is too
+ # coarse for the subsequent comparison of
+ # lastLoggedOut and lastChallenged
+ sleep(1)
+
+ self.page().set_redirect_url(session, session.marshal())
+
+ super(MainFrame, self).do_process(session, object)
+
class MainFrameTabs(LinkSet):
def __init__(self, app, name):
super(MainFrameTabs, self).__init__(app, name)
Modified: mgmt/cumin/python/cumin/page.strings
===================================================================
--- mgmt/cumin/python/cumin/page.strings 2008-02-26 16:34:01 UTC (rev 1726)
+++ mgmt/cumin/python/cumin/page.strings 2008-02-26 16:42:17 UTC (rev 1727)
@@ -708,12 +708,10 @@
[MainFrame.html]
<div id="head">
<div>
- <!--
<ul id="user">
- <li>Hi, <strong>user</strong></li>
- <li><a class="nav" href="">Log Out</a></li>
+ <li>Hi, <strong>{user_name}</strong></li>
+ <li><a class="nav" onclick="wooly.clearUpdates()" href="{logout_href}">Log Out</a></li>
</ul>
- -->
{tabs}
</div>
Modified: mgmt/cumin/python/wooly/__init__.py
===================================================================
--- mgmt/cumin/python/wooly/__init__.py 2008-02-26 16:34:01 UTC (rev 1726)
+++ mgmt/cumin/python/wooly/__init__.py 2008-02-26 16:42:17 UTC (rev 1727)
@@ -518,6 +518,7 @@
self.trunk = trunk
self.page = None
self.values = dict()
+ self.credentials = dict()
if self.app.debug:
self.debug = self.Debug(self)
Modified: mgmt/cumin/python/wooly/server.py
===================================================================
--- mgmt/cumin/python/wooly/server.py 2008-02-26 16:34:01 UTC (rev 1726)
+++ mgmt/cumin/python/wooly/server.py 2008-02-26 16:42:17 UTC (rev 1727)
@@ -1,7 +1,7 @@
-from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from traceback import print_exc
from datetime import datetime
from time import strptime
+from base64 import decodestring
from wooly import *
from devel import DevelPage
@@ -10,12 +10,13 @@
class WebServer(object):
http_date = "%a, %d %b %Y %H:%M:%S %Z"
- def __init__(self, app, port=8080):
+ def __init__(self, app, host, port):
self.app = app
+ self.host = host
self.port = port
- addr = ("localhost", self.port)
- apps = [("", self.wsgi_app)]
+ addr = (self.host, self.port)
+ apps = [("", self.service)]
self.__server = CherryPyWSGIServer(addr, apps)
self.__server.ssl_certificate = None # "/tmp/localhost.crt"
self.__server.ssl_private_key = None # "/tmp/localhost.key"
@@ -23,13 +24,11 @@
def start(self):
self.__server.start()
- print "Web server started on port %s" % (self.port)
-
def stop(self):
self.__server.stop()
- def wsgi_app(self, env, respond):
- if env["PATH_INFO"].endswith(".html"):
+ def service(self, env, respond):
+ if False and env["PATH_INFO"].endswith(".html"):
print "------------------------------------"
for key in sorted(env):
print key, env[key]
@@ -38,6 +37,17 @@
session.unmarshal_page(env["PATH_INFO"])
session.unmarshal_url_vars(env["QUERY_STRING"])
+ if "HTTP_AUTHORIZATION" in env:
+ str = env["HTTP_AUTHORIZATION"].split(" ")[1]
+ name, password = decodestring(str).split(":", 1)
+ session.credentials["name"] = name
+ session.credentials["password"] = password
+
+ if not self.authorized(session):
+ headers = [("WWW-Authenticate", "Basic realm=\"MRG Management\"")]
+ respond("401 Unauthorized", headers)
+ return ()
+
if env["REQUEST_METHOD"] == "POST":
if env["CONTENT_TYPE"] == "application/x-www-form-urlencoded":
length = int(env["CONTENT_LENGTH"])
@@ -101,6 +111,9 @@
return response
+ def authorized(self, session):
+ return False
+
def error(self, session, respond):
respond("500 Error", [("Content-Type", "text/plain")])
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2008-02-26 16:34:01 UTC (rev 1726)
+++ mgmt/notes/justin-todo.txt 2008-02-26 16:42:17 UTC (rev 1727)
@@ -16,8 +16,12 @@
* Add broker reg name unique constraint and validation
+ * Add unique constraint to user name, and deal with it in cumin-admin
+
Deferred
+ * Blow up if we try to call set_redirect_url twice in a session
+
* Need to add cherrypy bsd license to binary dist?
* See if we can't avoid the app.add_parameter in Parameter; adding to
16 years, 10 months
rhmessaging commits: r1726 - in mgmt/mint: sql and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-02-26 11:34:01 -0500 (Tue, 26 Feb 2008)
New Revision: 1726
Modified:
mgmt/mint/python/mint/__init__.py
mgmt/mint/sql/schema.sql
Log:
Add a new model object, ConsoleUser; generate a new schema file; introduce check and init methods on MintDatabase
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2008-02-26 16:23:41 UTC (rev 1725)
+++ mgmt/mint/python/mint/__init__.py 2008-02-26 16:34:01 UTC (rev 1726)
@@ -71,6 +71,12 @@
value = StringCol(length=1000, default=None)
type = StringCol(length=1, default="s")
+class ConsoleUser(SQLObject):
+ name = StringCol(length=1000, default=None)
+ password = StringCol(length=1000, default=None)
+ lastChallenged = TimestampCol(default=None)
+ lastLoggedOut = TimestampCol(default=None)
+
class OriginalIdDict:
def __init__(self):
self.idMap = dict()
@@ -315,6 +321,13 @@
def getConnection(self):
return connectionForURI(self.uri).getConnection()
+ def check(self):
+ self.checkConnection()
+
+ def init(self):
+ conn = connectionForURI(self.uri)
+ sqlhub.processConnection = conn
+
def checkConnection(self):
conn = self.getConnection()
Modified: mgmt/mint/sql/schema.sql
===================================================================
--- mgmt/mint/sql/schema.sql 2008-02-26 16:23:41 UTC (rev 1725)
+++ mgmt/mint/sql/schema.sql 2008-02-26 16:34:01 UTC (rev 1726)
@@ -37,6 +37,14 @@
type VARCHAR(1)
);
+CREATE TABLE console_user (
+ id SERIAL PRIMARY KEY,
+ name VARCHAR(1000),
+ password VARCHAR(1000),
+ last_challenged TIMESTAMP,
+ last_logged_out TIMESTAMP
+);
+
CREATE TABLE binding (
id SERIAL PRIMARY KEY,
id_original BIGINT,
@@ -80,8 +88,7 @@
initial_disk_page_size INT,
initial_pages_per_queue INT,
cluster_name VARCHAR(1000),
- version VARCHAR(1000),
- registration_id INT
+ version VARCHAR(1000)
);
CREATE TABLE broker_stats (
@@ -378,8 +385,6 @@
ALTER TABLE broker ADD CONSTRAINT system_id_exists FOREIGN KEY (system_id) REFERENCES system (id) ON DELETE SET NULL;
-ALTER TABLE broker ADD CONSTRAINT registration_id_exists FOREIGN KEY (registration_id) REFERENCES broker_registration (id) ON DELETE SET NULL;
-
ALTER TABLE broker_stats ADD CONSTRAINT broker_id_exists FOREIGN KEY (broker_id) REFERENCES broker (id) ON DELETE SET NULL;
ALTER TABLE client ADD CONSTRAINT stats_curr_id_exists FOREIGN KEY (stats_curr_id) REFERENCES client_stats (id) ON DELETE SET NULL;
@@ -457,3 +462,4 @@
ALTER TABLE vhost ADD CONSTRAINT broker_id_exists FOREIGN KEY (broker_id) REFERENCES broker (id) ON DELETE SET NULL;
ALTER TABLE vhost_stats ADD CONSTRAINT vhost_id_exists FOREIGN KEY (vhost_id) REFERENCES vhost (id) ON DELETE SET NULL;
+
16 years, 10 months
rhmessaging commits: r1725 - mgmt/cumin/resources.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-02-26 11:23:41 -0500 (Tue, 26 Feb 2008)
New Revision: 1725
Modified:
mgmt/cumin/resources/wooly.js
Log:
Add a function to disable interval updates for cases where we don't want any further http requests
Modified: mgmt/cumin/resources/wooly.js
===================================================================
--- mgmt/cumin/resources/wooly.js 2008-02-22 17:21:51 UTC (rev 1724)
+++ mgmt/cumin/resources/wooly.js 2008-02-26 16:23:41 UTC (rev 1725)
@@ -204,6 +204,7 @@
this.assert = assert;
this.log = log;
this.dir = dir;
+ this.updater_ids = [];
if (window.console) {
this.console = window.console;
@@ -219,6 +220,8 @@
}
var id = window.setInterval(fetch, interval);
+
+ this.updater_ids.push(id);
function update() {
try {
@@ -237,6 +240,12 @@
}
}
+ this.clearUpdates = function() {
+ for (var i = 0; i < this.updater_ids.length; i++) {
+ window.clearInterval(this.updater_ids[i])
+ }
+ }
+
this._doc = new WoolyDocument(document);
this.doc = function(doc) {
16 years, 10 months
rhmessaging commits: r1724 - in store/trunk/cpp: tests/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-02-22 12:21:51 -0500 (Fri, 22 Feb 2008)
New Revision: 1724
Added:
store/trunk/cpp/tests/jrnl/_st_basic.cpp
Modified:
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jinf.cpp
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/txn_map.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/tests/jrnl/
store/trunk/cpp/tests/jrnl/Makefile.am
store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp
store/trunk/cpp/tests/jrnl/run-journal-tests
Log:
Fixed double-dequeue problem, some bugs in jtt, added a double-deueue test.
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-02-22 17:03:16 UTC (rev 1723)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-02-22 17:21:51 UTC (rev 1724)
@@ -485,6 +485,7 @@
try
{
rd._ffid = ji.get_start_file();
+ rd._lfid = ji.get_end_file();
rd._owi = ji.get_initial_owi();
rd._empty = false;
}
@@ -663,12 +664,10 @@
}
break;
case 0:
- rd._lfid = fid;
rd._eo = ifsp->tellg();
return false;
default:
// Stop as this is the overwrite boundary.
- rd._lfid = fid;
rd._eo = file_pos;
return false;
}
@@ -696,7 +695,7 @@
// Tried this, but did not work
// if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL || h._magic != 0) throw;
check_journal_alignment(start_fid, start_file_offs);
- rd._lfid = start_fid;
+// rd._lfid = start_fid;
rd._eo = start_file_offs;
return false;
}
@@ -714,9 +713,8 @@
if (ifsp->eof() || !ifsp->good())
{
rd._eo = ifsp->tellg(); // remember file offset before closing
- rd._lfid = fid++;
ifsp->close();
- if (fid >= _num_jfiles)
+ if (++fid >= _num_jfiles)
{
fid = 0;
lowi = !lowi; // Flip local owi
@@ -763,7 +761,6 @@
if (fid == expected_fid)
{
check_journal_alignment(fid, file_pos);
- rd._lfid = fid;
rd._eo = file_pos;
return false;
}
Modified: store/trunk/cpp/lib/jrnl/jinf.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.cpp 2008-02-22 17:03:16 UTC (rev 1723)
+++ store/trunk/cpp/lib/jrnl/jinf.cpp 2008-02-22 17:21:51 UTC (rev 1724)
@@ -163,7 +163,6 @@
if (!_valid_flag)
validate();
-// for (u_int16_t fnum=0; fnum < _num_jfiles; fnum++)
u_int16_t fnum=0;
while (!done && fnum < _num_jfiles)
{
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2008-02-22 17:03:16 UTC (rev 1723)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2008-02-22 17:21:51 UTC (rev 1724)
@@ -193,12 +193,35 @@
if (ok && !found)
{
std::ostringstream oss;
- oss << std::hex << "xid=" << xid_format(xid) << " rid=" << rid;
+ oss << std::hex << "xid=" << xid_format(xid) << " rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "set_aio_compl");
}
return ok;
}
+const txn_data&
+txn_map::get_data(const std::string& xid, const u_int64_t rid)
+{
+ txn_data_list tdl = get_tdata_list(xid);
+ bool found = false;
+ {
+ slock s(&_mutex);
+ tdl_itr itr = tdl.begin();
+ while (itr != tdl.end() && !found)
+ {
+ found = itr->_rid == rid;
+ itr++;
+ }
+ if (!found)
+ {
+ std::ostringstream oss;
+ oss << std::hex << "xid=" << xid_format(xid) << " rid=0x" << rid;
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "get_data");
+ }
+ return *itr;
+ }
+}
+
void
txn_map::xid_list(std::vector<std::string>& xv)
{
@@ -218,7 +241,7 @@
return xid;
std::ostringstream oss;
oss << "\"" << xid.substr(0, 20) << " ... " << xid.substr(xid.size() - 20, 20);
- oss << "\" (size: " << xid.size() << ")";
+ oss << "\" [size: " << xid.size() << "]";
return oss.str();
}
Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp 2008-02-22 17:03:16 UTC (rev 1723)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp 2008-02-22 17:21:51 UTC (rev 1724)
@@ -125,6 +125,7 @@
const u_int32_t get_rid_count(const std::string& xid);
const bool is_txn_synced(const std::string& xid);
const bool set_aio_compl(const std::string& xid, const u_int64_t rid);
+ const txn_data& get_data(const std::string& xid, const u_int64_t rid);
inline void clear() { _map.clear(); }
inline const bool empty() const { return _map.empty(); }
inline const u_int16_t size() const { return (u_int16_t)_map.size(); }
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-02-22 17:03:16 UTC (rev 1723)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-02-22 17:21:51 UTC (rev 1724)
@@ -37,8 +37,6 @@
#include <jrnl/jcntl.hpp>
#include <jrnl/jerrno.hpp>
-#include <iostream> // debug
-
namespace rhm
{
namespace journal
@@ -141,8 +139,6 @@
throw jexception(jerrno::JERR_WMGR_ENQDISCONT, oss.str(), "wmgr", "enqueue");
}
}
- else
- _enq_busy = true;
u_int64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _wrfc.get_incr_rid();
_enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, _wrfc.owi(), transient,
@@ -155,6 +151,7 @@
dtokp->set_xid(xid_ptr, xid_len);
else
dtokp->clear_xid();
+ _enq_busy = true;
}
bool done = false;
while (!done)
@@ -276,11 +273,6 @@
throw jexception(jerrno::JERR_WMGR_DEQDISCONT, oss.str(), "wmgr", "dequeue");
}
}
- else
- {
- _deq_busy = true;
- dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
- }
const bool ext_rid = dtokp->external_rid();
u_int64_t rid = (ext_rid | cont) ? dtokp->rid() : _wrfc.get_incr_rid();
@@ -297,6 +289,9 @@
dtokp->set_xid(xid_ptr, xid_len);
else
dtokp->clear_xid();
+ dequeue_check(dtokp->xid(), dequeue_rid);
+ dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
+ _deq_busy = true;
}
bool done = false;
while (!done)
@@ -421,11 +416,6 @@
throw jexception(jerrno::JERR_WMGR_DEQDISCONT, oss.str(), "wmgr", "abort");
}
}
- else
- {
- _abort_busy = true;
- dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
- }
u_int64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _wrfc.get_incr_rid();
_txn_rec.reset(RHM_JDAT_TXA_MAGIC, rid, xid_ptr, xid_len, _wrfc.owi());
@@ -434,6 +424,8 @@
dtokp->set_rid(rid);
dtokp->set_dequeue_rid(0);
dtokp->set_xid(xid_ptr, xid_len);
+ dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
+ _abort_busy = true;
}
bool done = false;
while (!done)
@@ -565,11 +557,6 @@
throw jexception(jerrno::JERR_WMGR_DEQDISCONT, oss.str(), "wmgr", "commit");
}
}
- else
- {
- _commit_busy = true;
- dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
- }
u_int64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _wrfc.get_incr_rid();
_txn_rec.reset(RHM_JDAT_TXC_MAGIC, rid, xid_ptr, xid_len, _wrfc.owi());
@@ -578,6 +565,8 @@
dtokp->set_rid(rid);
dtokp->set_dequeue_rid(0);
dtokp->set_xid(xid_ptr, xid_len);
+ dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
+ _commit_busy = true;
}
bool done = false;
while (!done)
@@ -1014,6 +1003,19 @@
}
void
+wmgr::dequeue_check(const std::string& xid, const u_int64_t drid)
+{
+ // First check emap
+ try { _emap.get_fid(drid); }
+ catch(const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+ throw;
+ _tmap.get_data(xid, drid); // not in emap, try tmap
+ }
+}
+
+void
wmgr::dblk_roundup()
{
const u_int32_t xmagic = RHM_JDAT_EMPTY_MAGIC;
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2008-02-22 17:03:16 UTC (rev 1723)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2008-02-22 17:21:51 UTC (rev 1724)
@@ -123,6 +123,7 @@
const iores pre_write_check(const _op_type op, const data_tok* const dtokp,
const size_t xidsize = 0, const size_t dsize = 0, const bool external = false)
const;
+ void dequeue_check(const std::string& xid, const u_int64_t drid);
const iores write_flush();
const iores rotate_file();
void dblk_roundup();
Property changes on: store/trunk/cpp/tests/jrnl
___________________________________________________________________
Name: svn:ignore
- .deps
.libs
Makefile
Makefile.in
jtest
_ut_enq_map
_ut_jdir
_ut_jerrno
_ut_jexception
_ut_jinf
_ut_rec_hdr
_ut_time_ns
_ut_txn_map
+ .deps
.libs
Makefile
Makefile.in
jtest
_ut_enq_map
_ut_jdir
_ut_jerrno
_ut_jexception
_ut_jinf
_ut_rec_hdr
_ut_time_ns
_ut_txn_map
_st_basic
Modified: store/trunk/cpp/tests/jrnl/Makefile.am
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.am 2008-02-22 17:03:16 UTC (rev 1723)
+++ store/trunk/cpp/tests/jrnl/Makefile.am 2008-02-22 17:21:51 UTC (rev 1724)
@@ -36,6 +36,7 @@
_ut_jdir \
_ut_enq_map \
_ut_txn_map \
+ _st_basic \
run-journal-tests
check_LTLIBRARIES = \
@@ -50,7 +51,8 @@
_ut_jinf \
_ut_jdir \
_ut_enq_map \
- _ut_txn_map
+ _ut_txn_map \
+ _st_basic
UNIT_TEST_SRCS = ../unit_test.cpp
UNIT_TEST_LDADD = -lboost_unit_test_framework -lbdbstore -L../../lib/.libs
@@ -79,6 +81,9 @@
_ut_txn_map_SOURCES = _ut_txn_map.cpp $(UNIT_TEST_SRCS)
_ut_txn_map_LDADD = $(UNIT_TEST_LDADD) -lrt
+_st_basic_SOURCES = _st_basic.cpp $(UNIT_TEST_SRCS)
+_st_basic_LDADD = $(UNIT_TEST_LDADD) -lrt
+
JournalSystemTests_la_SOURCES = \
JournalSystemTests.cpp \
JournalSystemTests.hpp
Added: store/trunk/cpp/tests/jrnl/_st_basic.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_basic.cpp (rev 0)
+++ store/trunk/cpp/tests/jrnl/_st_basic.cpp 2008-02-22 17:21:51 UTC (rev 1724)
@@ -0,0 +1,111 @@
+/**
+* \file _st_basic.cpp
+*
+* Red Hat Messaging - Message Journal
+*
+* This file contains the unit tests for the journal.
+*
+* Copyright 2007, 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#include "../unit_test.h"
+#include <jrnl/jcntl.hpp>
+#include <jrnl/jerrno.hpp>
+#include <jrnl/jexception.hpp>
+
+#include <iostream>
+
+#define NUM_JFILES 4
+#define JFSIZE_SBLKS 96
+#define JRNL_DIR "/tmp/jdata"
+#define MAX_AIO_SLEEPS 500
+#define AIO_SLEEP_TIME 1000
+
+using namespace boost::unit_test;
+using namespace rhm::journal;
+using namespace std;
+
+QPID_AUTO_TEST_SUITE(journal_basic)
+
+// Helper function fwd decls
+u_int64_t enq_msg(jcntl& jc, const string msg, const bool transient);
+void deq_msg(jcntl& jc, const u_int64_t drid);
+bool handle_jcntl_response(const iores res, jcntl& jc, unsigned& aio_sleep_cnt);
+
+
+BOOST_AUTO_TEST_CASE(double_dequeue)
+{
+ char* test_name = "DoubleDequeue";
+ jcntl jc(test_name, JRNL_DIR, test_name, NUM_JFILES, JFSIZE_SBLKS);
+ jc.initialize(0, 0);
+ string msg("Message");
+ u_int64_t rid = enq_msg(jc, msg, false);
+ deq_msg(jc, rid);
+ try{ deq_msg(jc, rid); BOOST_FAIL("Did not throw exception on second dequeue."); }
+ catch (const jexception& e) { BOOST_CHECK_EQUAL(e.err_code(), jerrno::JERR_MAP_NOTFOUND); }
+ rid = enq_msg(jc, msg, false);
+ deq_msg(jc, rid);
+}
+
+
+// Helper functions
+
+u_int64_t enq_msg(jcntl& jc, const string msg, const bool transient)
+{
+ data_tok* dtp = new data_tok;
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc.enqueue_data_record(msg.c_str(), msg.size(), msg.size(),
+ dtp, transient), jc, aio_sleep_cnt));
+ return dtp->rid();
+}
+
+void deq_msg(jcntl& jc, const u_int64_t drid)
+{
+ data_tok* dtp = new data_tok;
+ dtp->set_wstate(data_tok::ENQ);
+ dtp->set_rid(drid);
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc.dequeue_data_record(dtp), jc, aio_sleep_cnt));
+}
+
+bool handle_jcntl_response(const iores res, jcntl& jc, unsigned& aio_sleep_cnt)
+{
+ switch (res)
+ {
+ case RHM_IORES_SUCCESS:
+ return false;
+ case RHM_IORES_AIO_WAIT:
+ if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
+ {
+ jc.get_wr_events();
+ usleep(AIO_SLEEP_TIME);
+ }
+ else
+ BOOST_FAIL(iores_str(res));
+ break;
+ default:
+ BOOST_FAIL(iores_str(res));
+ }
+ return true;
+}
+
+QPID_AUTO_TEST_SUITE_END()
Modified: store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py 2008-02-22 17:03:16 UTC (rev 1723)
+++ store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py 2008-02-22 17:21:51 UTC (rev 1724)
@@ -277,7 +277,7 @@
self.enq_tail = RecTail(self.tail_offs, *unpack(RecTail.format, self.tail_bin))
if self.enq_tail.magic_inv != inv_str(self.magic) or self.enq_tail.rid != self.rid:
print " > %s" % self
- raise Exception('Invalid enqueue record tail (magic=0x%08x; rid=%d) at 0x%08x' % (self.enq_tail.magic_inv, self.enq_tail.rid, self.enq_tail.foffs))
+ raise Exception('Invalid dequeue record tail (magic=%s; rid=%d) at 0x%08x' % (self.enq_tail, self.enq_tail.rid, self.enq_tail.foffs))
self.enq_tail.skip(f)
self.tail_complete = ret[1]
return self.complete()
@@ -317,7 +317,7 @@
self.enq_tail = RecTail(self.tail_offs, *unpack(RecTail.format, self.tail_bin))
if self.enq_tail.magic_inv != inv_str(self.magic) or self.enq_tail.rid != self.rid:
print " > %s" % self
- raise Exception('Invalid enqueue record tail (magic=0x%08x; rid=%d) at 0x%08x' % (self.enq_tail.magic_inv, self.enq_tail.rid, self.enq_tail.foffs))
+ raise Exception('Invalid transaction record tail (magic=%s; rid=%d) at 0x%08x' % (self.enq_tail, self.enq_tail.rid, self.enq_tail.foffs))
self.enq_tail.skip(f)
self.tail_complete = ret[1]
return self.complete()
@@ -388,7 +388,7 @@
self.enq_tail = RecTail(self.tail_offs, *unpack(RecTail.format, self.tail_bin))
if self.enq_tail.magic_inv != inv_str(self.magic) or self.enq_tail.rid != self.rid:
print " > %s" % self
- raise Exception('Invalid enqueue record tail (magic=0x%08x; rid=%d) at 0x%08x' % (self.enq_tail.magic_inv, self.enq_tail.rid, self.enq_tail.foffs))
+ raise Exception('Invalid enqueue record tail (magic=%s; rid=%d) at 0x%08x' % (self.enq_tail, self.enq_tail.rid, self.enq_tail.foffs))
self.enq_tail.skip(f)
self.tail_complete = ret[1]
return self.complete()
Modified: store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp 2008-02-22 17:03:16 UTC (rev 1723)
+++ store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp 2008-02-22 17:21:51 UTC (rev 1724)
@@ -55,16 +55,23 @@
std::cout << "CSV file: \"" << _args.test_case_csv_file_name << "\"";
test_case_set tcs(_args.test_case_csv_file_name, _args.recover_mode);
- const unsigned num_test_cases = tcs.size();
- if (num_test_cases)
+
+ if (tcs.size())
{
std::cout << " (found " << tcs.size() << " test case" << (tcs.size() != 1 ? "s" : "") <<
")" << std::endl;
if (tcs.ignored())
- std::cout << "WARNING: " << tcs.ignored() << " test cases were ignored. "
- "(recover-mode selected and test has no auto-dequeue.)" << std::endl;
+ std::cout << "WARNING: " << tcs.ignored() << " test cases were ignored. (All test "
+ "cases without auto-dequeue are ignored when recover-mode is selected.)" <<
+ std::endl;
_args.print_args();
}
+ else if(tcs.ignored())
+ {
+ std::cout << " WARNING: All " << tcs.ignored() << " test case(s) were ignored. (All test "
+ "cases without auto-dequeue are ignored when recover-mode is selected.)" <<
+ std::endl;
+ }
else
std::cout << " (WARNING: This CSV file is empty or does not exist.)" << std::endl;
@@ -164,10 +171,12 @@
if (!summary)
std::cout << " === Results ===" << std::endl;
- for (test_case::res_map_citr i=tcp->jmap_begin(); i!=tcp->jmap_end(); i++)
- std::cout << (*i).second->str(summary, summary);
- if (tcp->num_jrnls() > 1)
- std::cout << tcp->average().str(summary, summary);
+// TODO - the reporting is broken when --repeat is used. The following commented-ot
+// section was an attempt to fix it, but there are too many side-effects.
+// for (test_case::res_map_citr i=tcp->jmap_begin(); i!=tcp->jmap_end(); i++)
+// std::cout << (*i).second->str(summary, summary);
+// if (tcp->num_jrnls() > 1)
+ std::cout << tcp->average().str(false, summary);
if (!summary)
std::cout << std::endl;
Modified: store/trunk/cpp/tests/jrnl/run-journal-tests
===================================================================
--- store/trunk/cpp/tests/jrnl/run-journal-tests 2008-02-22 17:03:16 UTC (rev 1723)
+++ store/trunk/cpp/tests/jrnl/run-journal-tests 2008-02-22 17:21:51 UTC (rev 1724)
@@ -6,38 +6,22 @@
# Run jtt using default test set
echo
-echo "***** Mode 1: New journal instance, no recover, single journal *****"
+echo "***** Mode 1: New journal instance, no recover *****"
rm -rf /tmp/test_0*
-$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --format-chk || fail = 1
+$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --format-chk --num-jrnls 3 || fail = 1
echo
-#echo "***** Mode 2: New journal instance, no recover, multiple journals *****"
-#rm -rf /tmp/test_0*
-#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --format-chk --num-jrnls 5 || fail = 1
-#echo
-#echo "***** Mode 3: Reuse journal instance, no recover, single journal *****"
-#rm -rf /tmp/test_0*
-#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --format-chk || fail = 1
-#echo
-#echo "***** Mode 4: Reuse journal instance, no recover, multiple journals *****"
-#rm -rf /tmp/test_0*
-#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --format-chk --num-jrnls 5 || fail = 1
-#echo
-#echo "***** Mode 5: New journal instance, recover previous test journal, single journal *****"
-#rm -rf /tmp/test_0*
-#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --recover-mode --format-chk || fail = 1
-#echo
-#echo "***** Mode 6: New journal instance, recover previous test journal, multiple journals *****"
-#rm -rf /tmp/test_0*
-#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --recover-mode --format-chk --num-jrnls 5 || fail = 1
-#echo
-#echo "***** Mode 7: Reuse journal instance, recover previous test journal, single journal *****"
-#rm -rf /tmp/test_0*
-#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --recover-mode --format-chk || fail = 1
-#echo
-#echo "***** Mode 8: Reuse journal instance, recover previous test journal, multiple journals *****"
-#rm -rf /tmp/test_0*
-#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --recover-mode --format-chk --num-jrnls 5 || fail = 1
-#echo
+echo "***** Mode 2: Reuse journal instance, no recover *****"
+rm -rf /tmp/test_0*
+$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --format-chk --num-jrnls 3 || fail = 1
+echo
+echo "***** Mode 3: New journal instance, recover previous test journal *****"
+rm -rf /tmp/test_0*
+$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --recover-mode --format-chk --num-jrnls 3 || fail = 1
+echo
+echo "***** Mode 4: Reuse journal instance, recover previous test journal *****"
+rm -rf /tmp/test_0*
+$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --recover-mode --format-chk --num-jrnls 3 || fail = 1
+echo
# Run cpp-unit tests
LD_PRELOAD=$pwd/.libs/libdlclose_noop.so DllPlugInTester -c -b $pwd/.libs/Journal*Tests.so || fail=1
16 years, 10 months
rhmessaging commits: r1723 - in mgmt: cumin/python/wooly and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-02-22 12:03:16 -0500 (Fri, 22 Feb 2008)
New Revision: 1723
Added:
mgmt/cumin/python/wooly/wsgiserver.py
Modified:
mgmt/cumin/bin/cumin
mgmt/cumin/python/wooly/server.py
mgmt/notes/justin-todo.txt
Log:
Import the wsgi server impl from cherrypy.
Convert the wooly server to use it. For now, things like
if-modified-since are disabled.
Modified: mgmt/cumin/bin/cumin
===================================================================
--- mgmt/cumin/bin/cumin 2008-02-21 21:36:56 UTC (rev 1722)
+++ mgmt/cumin/bin/cumin 2008-02-22 17:03:16 UTC (rev 1723)
@@ -36,9 +36,12 @@
server = WebServer(app, port)
try:
- server.run()
+ server.start()
except KeyboardInterrupt:
- pass
+ server.stop()
+ except:
+ server.stop()
+ raise
def main():
if "-h" in sys.argv or "--help" in sys.argv:
Modified: mgmt/cumin/python/wooly/server.py
===================================================================
--- mgmt/cumin/python/wooly/server.py 2008-02-21 21:36:56 UTC (rev 1722)
+++ mgmt/cumin/python/wooly/server.py 2008-02-22 17:03:16 UTC (rev 1723)
@@ -5,42 +5,43 @@
from wooly import *
from devel import DevelPage
+from wsgiserver import CherryPyWSGIServer
-
class WebServer(object):
+ http_date = "%a, %d %b %Y %H:%M:%S %Z"
+
def __init__(self, app, port=8080):
self.app = app
self.port = port
- def run(self):
- server = HTTPServer(("", self.port), self.RequestHandler)
+ addr = ("localhost", self.port)
+ apps = [("", self.wsgi_app)]
+ self.__server = CherryPyWSGIServer(addr, apps)
+ self.__server.ssl_certificate = None # "/tmp/localhost.crt"
+ self.__server.ssl_private_key = None # "/tmp/localhost.key"
- # XXX hack, because HTTPServer and python conspire to make
- # this hard
- server.app = self.app
+ def start(self):
+ self.__server.start()
- print "Cumin server started on port %s" % (self.port)
+ print "Web server started on port %s" % (self.port)
- server.serve_forever()
+ def stop(self):
+ self.__server.stop()
- class RequestHandler(BaseHTTPRequestHandler):
- http_date = "%a, %d %b %Y %H:%M:%S %Z"
+ def wsgi_app(self, env, respond):
+ if env["PATH_INFO"].endswith(".html"):
+ print "------------------------------------"
+ for key in sorted(env):
+ print key, env[key]
- def do_GET(self):
- session = Session(self.server.app)
- session.unmarshal(self.path[1:])
+ session = Session(self.app)
+ session.unmarshal_page(env["PATH_INFO"])
+ session.unmarshal_url_vars(env["QUERY_STRING"])
- self.service(session)
-
- def do_POST(self):
- session = Session(self.server.app)
- session.unmarshal_page(self.path.split("?")[0])
-
- content_type = str(self.headers.getheader("content-type"))
-
- if content_type == "application/x-www-form-urlencoded":
- length = int(self.headers.getheader("content-length"))
- vars = self.rfile.read(length)
+ if env["REQUEST_METHOD"] == "POST":
+ if env["CONTENT_TYPE"] == "application/x-www-form-urlencoded":
+ length = int(env["CONTENT_LENGTH"])
+ vars = env["wsgi.input"].read(length)
else:
raise Exception("Content type '%s' is not supported" \
% content_type)
@@ -48,75 +49,68 @@
if vars:
session.unmarshal_url_vars(vars, "&")
- self.service(session)
+ page = session.get_page()
- def service(self, session):
- page = session.get_page()
+ try:
+ page.process(session, None)
+ except:
+ return self.error(session)
- try:
- page.process(session, None)
- except:
- self.error(session)
- return
+ redirect = page.get_redirect_url(session)
- redirect = page.get_redirect_url(session)
+ if redirect:
+ respond("303 See Other", [("Location", redirect)])
+ return ()
- if redirect:
- self.send_response(303)
- self.send_header("Location", redirect)
- self.end_headers()
- return
+# ims = self.headers.getheader("if-modified-since")
+# modified = page.get_last_modified(session).replace \
+# (microsecond=0)
- ims = self.headers.getheader("if-modified-since")
- modified = page.get_last_modified(session).replace \
- (microsecond=0)
+# if ims:
+# since = datetime(*strptime(str(ims), self.http_date)[0:6])
- if ims:
- since = datetime(*strptime(str(ims), self.http_date)[0:6])
+# if modified <= since:
+# self.send_response(304)
+# self.end_headers()
+# return
- if modified <= since:
- self.send_response(304)
- self.end_headers()
- return
+ try:
+ response = page.render(session, None)
+ except:
+ return self.error(session)
- try:
- response = page.render(session, None)
- except:
- self.error(session)
- return
+ headers = list()
- self.send_response(200)
+# if modified:
+# ts = modified.strftime("%a, %d %b %Y %H:%M:%S GMT")
+# list.append(("Last-Modified", ts))
- if modified:
- ts = modified.strftime("%a, %d %b %Y %H:%M:%S GMT")
- self.send_header("Last-Modified", ts)
+ type = page.get_content_type(session)
- type = page.get_content_type(session)
+ if type:
+ headers.append(("Content-Type", type))
- if type:
- self.send_header("Content-Type", type)
+ cache = page.get_cache_control(session)
- cache = page.get_cache_control(session)
+ if cache:
+ headers.append(("Cache-Control", cache))
- if cache:
- self.send_header("Cache-Control", cache)
+ respond("200 OK", headers)
- self.end_headers()
+ page.save_session(session)
- self.wfile.write(response)
+ return response
- page.save_session(session)
+ def error(self, session, respond):
+ respond("500 Error", [("Content-Type", "text/plain")])
- def error(self, session):
- self.send_response(500)
- self.send_header("Content-Type", "text/plain")
- self.end_headers()
-
- if session.debug:
- self.wfile.write("APPLICATION ERROR\n")
- self.wfile.write("\n----- python trace -----\n")
- print_exc(None, self.wfile)
- self.wfile.write("\n----- process trace -----\n")
- session.debug.print_process_calls(self.wfile)
- self.wfile.write("\n----- render trace -----\n")
- session.debug.print_render_calls(self.wfile)
+ if session.debug:
+ writer = Writer()
+ writer.write("APPLICATION ERROR\n")
+ writer.write("\n----- python trace -----\n")
+ print_exc(None, writer)
+ writer.write("\n----- process trace -----\n")
+ session.debug.print_process_calls(writer)
+ writer.write("\n----- render trace -----\n")
+ session.debug.print_render_calls(writer)
+ return writer.to_string()
Added: mgmt/cumin/python/wooly/wsgiserver.py
===================================================================
--- mgmt/cumin/python/wooly/wsgiserver.py (rev 0)
+++ mgmt/cumin/python/wooly/wsgiserver.py 2008-02-22 17:03:16 UTC (rev 1723)
@@ -0,0 +1,1046 @@
+# Copyright (c) 2004-2007, CherryPy Team (team(a)cherrypy.org)
+# All rights reserved.
+
+# Redistribution and use in source and binary forms, with or without modification,
+# are permitted provided that the following conditions are met:
+
+# * Redistributions of source code must retain the above copyright notice,
+# this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+# * Neither the name of the CherryPy Team nor the names of its contributors
+# may be used to endorse or promote products derived from this software
+# without specific prior written permission.
+
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""A high-speed, production ready, thread pooled, generic WSGI server.
+
+Simplest example on how to use this module directly
+(without using CherryPy's application machinery):
+
+ from cherrypy import wsgiserver
+
+ def my_crazy_app(environ, start_response):
+ status = '200 OK'
+ response_headers = [('Content-type','text/plain')]
+ start_response(status, response_headers)
+ return ['Hello world!\n']
+
+ # Here we set our application to the script_name '/'
+ wsgi_apps = [('/', my_crazy_app)]
+
+ server = wsgiserver.CherryPyWSGIServer(('localhost', 8070), wsgi_apps,
+ server_name='localhost')
+
+ # Want SSL support? Just set these attributes
+ # server.ssl_certificate = <filename>
+ # server.ssl_private_key = <filename>
+
+ if __name__ == '__main__':
+ try:
+ server.start()
+ except KeyboardInterrupt:
+ server.stop()
+
+This won't call the CherryPy engine (application side) at all, only the
+WSGI server, which is independant from the rest of CherryPy. Don't
+let the name "CherryPyWSGIServer" throw you; the name merely reflects
+its origin, not it's coupling.
+
+The CherryPy WSGI server can serve as many WSGI applications
+as you want in one instance:
+
+ wsgi_apps = [('/', my_crazy_app), ('/blog', my_blog_app)]
+
+"""
+
+
+import base64
+import Queue
+import os
+import re
+quoted_slash = re.compile("(?i)%2F")
+import rfc822
+import socket
+try:
+ import cStringIO as StringIO
+except ImportError:
+ import StringIO
+import sys
+import threading
+import time
+import traceback
+from urllib import unquote
+from urlparse import urlparse
+
+try:
+ from OpenSSL import SSL
+ from OpenSSL import crypto
+except ImportError:
+ SSL = None
+
+import errno
+socket_errors_to_ignore = []
+# Not all of these names will be defined for every platform.
+for _ in ("EPIPE", "ETIMEDOUT", "ECONNREFUSED", "ECONNRESET",
+ "EHOSTDOWN", "EHOSTUNREACH",
+ "WSAECONNABORTED", "WSAECONNREFUSED", "WSAECONNRESET",
+ "WSAENETRESET", "WSAETIMEDOUT"):
+ if _ in dir(errno):
+ socket_errors_to_ignore.append(getattr(errno, _))
+# de-dupe the list
+socket_errors_to_ignore = dict.fromkeys(socket_errors_to_ignore).keys()
+socket_errors_to_ignore.append("timed out")
+
+comma_separated_headers = ['ACCEPT', 'ACCEPT-CHARSET', 'ACCEPT-ENCODING',
+ 'ACCEPT-LANGUAGE', 'ACCEPT-RANGES', 'ALLOW', 'CACHE-CONTROL',
+ 'CONNECTION', 'CONTENT-ENCODING', 'CONTENT-LANGUAGE', 'EXPECT',
+ 'IF-MATCH', 'IF-NONE-MATCH', 'PRAGMA', 'PROXY-AUTHENTICATE', 'TE',
+ 'TRAILER', 'TRANSFER-ENCODING', 'UPGRADE', 'VARY', 'VIA', 'WARNING',
+ 'WWW-AUTHENTICATE']
+
+class HTTPRequest(object):
+ """An HTTP Request (and response).
+
+ A single HTTP connection may consist of multiple request/response pairs.
+
+ connection: the HTTP Connection object which spawned this request.
+ rfile: the 'read' fileobject from the connection's socket
+ ready: when True, the request has been parsed and is ready to begin
+ generating the response. When False, signals the calling Connection
+ that the response should not be generated and the connection should
+ close.
+ close_connection: signals the calling Connection that the request
+ should close. This does not imply an error! The client and/or
+ server may each request that the connection be closed.
+ chunked_write: if True, output will be encoded with the "chunked"
+ transfer-coding. This value is set automatically inside
+ send_headers.
+ """
+
+ def __init__(self, connection):
+ self.connection = connection
+ self.rfile = self.connection.rfile
+ self.sendall = self.connection.sendall
+ self.environ = connection.environ.copy()
+
+ self.ready = False
+ self.started_response = False
+ self.status = ""
+ self.outheaders = []
+ self.sent_headers = False
+ self.close_connection = False
+ self.chunked_write = False
+
+ def parse_request(self):
+ """Parse the next HTTP request start-line and message-headers."""
+ # HTTP/1.1 connections are persistent by default. If a client
+ # requests a page, then idles (leaves the connection open),
+ # then rfile.readline() will raise socket.error("timed out").
+ # Note that it does this based on the value given to settimeout(),
+ # and doesn't need the client to request or acknowledge the close
+ # (although your TCP stack might suffer for it: cf Apache's history
+ # with FIN_WAIT_2).
+ request_line = self.rfile.readline()
+ if not request_line:
+ # Force self.ready = False so the connection will close.
+ self.ready = False
+ return
+
+ if request_line == "\r\n":
+ # RFC 2616 sec 4.1: "...if the server is reading the protocol
+ # stream at the beginning of a message and receives a CRLF
+ # first, it should ignore the CRLF."
+ # But only ignore one leading line! else we enable a DoS.
+ request_line = self.rfile.readline()
+ if not request_line:
+ self.ready = False
+ return
+
+ server = self.connection.server
+ environ = self.environ
+ environ["SERVER_SOFTWARE"] = "%s WSGI Server" % server.version
+
+ method, path, req_protocol = request_line.strip().split(" ", 2)
+ environ["REQUEST_METHOD"] = method
+
+ # path may be an abs_path (including "http://host.domain.tld");
+ scheme, location, path, params, qs, frag = urlparse(path)
+
+ if frag:
+ self.simple_response("400 Bad Request",
+ "Illegal #fragment in Request-URI.")
+ return
+
+ if scheme:
+ environ["wsgi.url_scheme"] = scheme
+ if params:
+ path = path + ";" + params
+
+ # Unquote the path+params (e.g. "/this%20path" -> "this path").
+ # http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5.1.2
+ #
+ # But note that "...a URI must be separated into its components
+ # before the escaped characters within those components can be
+ # safely decoded." http://www.ietf.org/rfc/rfc2396.txt, sec 2.4.2
+ atoms = [unquote(x) for x in quoted_slash.split(path)]
+ path = "%2F".join(atoms)
+
+ if path == "*":
+ # This means, of course, that the last wsgi_app (shortest path)
+ # will always handle a URI of "*".
+ environ["SCRIPT_NAME"] = ""
+ environ["PATH_INFO"] = "*"
+ self.wsgi_app = server.mount_points[-1][1]
+ else:
+ for mount_point, wsgi_app in server.mount_points:
+ # The mount_points list should be sorted by length, descending.
+ if path.startswith(mount_point + "/") or path == mount_point:
+ environ["SCRIPT_NAME"] = mount_point
+ environ["PATH_INFO"] = path[len(mount_point):]
+ self.wsgi_app = wsgi_app
+ break
+ else:
+ self.simple_response("404 Not Found")
+ return
+
+ # Note that, like wsgiref and most other WSGI servers,
+ # we unquote the path but not the query string.
+ environ["QUERY_STRING"] = qs
+
+ # Compare request and server HTTP protocol versions, in case our
+ # server does not support the requested protocol. Limit our output
+ # to min(req, server). We want the following output:
+ # request server actual written supported response
+ # protocol protocol response protocol feature set
+ # a 1.0 1.0 1.0 1.0
+ # b 1.0 1.1 1.1 1.0
+ # c 1.1 1.0 1.0 1.0
+ # d 1.1 1.1 1.1 1.1
+ # Notice that, in (b), the response will be "HTTP/1.1" even though
+ # the client only understands 1.0. RFC 2616 10.5.6 says we should
+ # only return 505 if the _major_ version is different.
+ rp = int(req_protocol[5]), int(req_protocol[7])
+ sp = int(server.protocol[5]), int(server.protocol[7])
+ if sp[0] != rp[0]:
+ self.simple_response("505 HTTP Version Not Supported")
+ return
+ # Bah. "SERVER_PROTOCOL" is actually the REQUEST protocol.
+ environ["SERVER_PROTOCOL"] = req_protocol
+ # set a non-standard environ entry so the WSGI app can know what
+ # the *real* server protocol is (and what features to support).
+ # See http://www.faqs.org/rfcs/rfc2145.html.
+ environ["ACTUAL_SERVER_PROTOCOL"] = server.protocol
+ self.response_protocol = "HTTP/%s.%s" % min(rp, sp)
+
+ # If the Request-URI was an absoluteURI, use its location atom.
+ if location:
+ environ["SERVER_NAME"] = location
+
+ # then all the http headers
+ try:
+ self.read_headers()
+ except ValueError, ex:
+ self.simple_response("400 Bad Request", repr(ex.args))
+ return
+
+ creds = environ.get("HTTP_AUTHORIZATION", "").split(" ", 1)
+ environ["AUTH_TYPE"] = creds[0]
+ if creds[0].lower() == 'basic':
+ user, pw = base64.decodestring(creds[1]).split(":", 1)
+ environ["REMOTE_USER"] = user
+
+ # Persistent connection support
+ if self.response_protocol == "HTTP/1.1":
+ if environ.get("HTTP_CONNECTION", "") == "close":
+ self.close_connection = True
+ else:
+ # HTTP/1.0
+ if environ.get("HTTP_CONNECTION", "") != "Keep-Alive":
+ self.close_connection = True
+
+ # Transfer-Encoding support
+ te = None
+ if self.response_protocol == "HTTP/1.1":
+ te = environ.get("HTTP_TRANSFER_ENCODING")
+ if te:
+ te = [x.strip().lower() for x in te.split(",") if x.strip()]
+
+ read_chunked = False
+
+ if te:
+ for enc in te:
+ if enc == "chunked":
+ read_chunked = True
+ else:
+ # Note that, even if we see "chunked", we must reject
+ # if there is an extension we don't recognize.
+ self.simple_response("501 Unimplemented")
+ self.close_connection = True
+ return
+
+ if read_chunked:
+ if not self.decode_chunked():
+ return
+
+ # From PEP 333:
+ # "Servers and gateways that implement HTTP 1.1 must provide
+ # transparent support for HTTP 1.1's "expect/continue" mechanism.
+ # This may be done in any of several ways:
+ # 1. Respond to requests containing an Expect: 100-continue request
+ # with an immediate "100 Continue" response, and proceed normally.
+ # 2. Proceed with the request normally, but provide the application
+ # with a wsgi.input stream that will send the "100 Continue"
+ # response if/when the application first attempts to read from
+ # the input stream. The read request must then remain blocked
+ # until the client responds.
+ # 3. Wait until the client decides that the server does not support
+ # expect/continue, and sends the request body on its own.
+ # (This is suboptimal, and is not recommended.)
+ #
+ # We used to do 3, but are now doing 1. Maybe we'll do 2 someday,
+ # but it seems like it would be a big slowdown for such a rare case.
+ if environ.get("HTTP_EXPECT", "") == "100-continue":
+ self.simple_response(100)
+
+ self.ready = True
+
+ def read_headers(self):
+ """Read header lines from the incoming stream."""
+ environ = self.environ
+
+ while True:
+ line = self.rfile.readline()
+ if not line:
+ # No more data--illegal end of headers
+ raise ValueError("Illegal end of headers.")
+
+ if line == '\r\n':
+ # Normal end of headers
+ break
+
+ if line[0] in ' \t':
+ # It's a continuation line.
+ v = line.strip()
+ else:
+ k, v = line.split(":", 1)
+ k, v = k.strip().upper(), v.strip()
+ envname = "HTTP_" + k.replace("-", "_")
+
+ if k in comma_separated_headers:
+ existing = environ.get(envname)
+ if existing:
+ v = ", ".join((existing, v))
+ environ[envname] = v
+
+ ct = environ.pop("HTTP_CONTENT_TYPE", None)
+ if ct:
+ environ["CONTENT_TYPE"] = ct
+ cl = environ.pop("HTTP_CONTENT_LENGTH", None)
+ if cl:
+ environ["CONTENT_LENGTH"] = cl
+
+ def decode_chunked(self):
+ """Decode the 'chunked' transfer coding."""
+ cl = 0
+ data = StringIO.StringIO()
+ while True:
+ line = self.rfile.readline().strip().split(";", 1)
+ chunk_size = int(line.pop(0), 16)
+ if chunk_size <= 0:
+ break
+## if line: chunk_extension = line[0]
+ cl += chunk_size
+ data.write(self.rfile.read(chunk_size))
+ crlf = self.rfile.read(2)
+ if crlf != "\r\n":
+ self.simple_response("400 Bad Request",
+ "Bad chunked transfer coding "
+ "(expected '\\r\\n', got %r)" % crlf)
+ return
+
+ # Grab any trailer headers
+ self.read_headers()
+
+ data.seek(0)
+ self.environ["wsgi.input"] = data
+ self.environ["CONTENT_LENGTH"] = str(cl) or ""
+ return True
+
+ def respond(self):
+ """Call the appropriate WSGI app and write its iterable output."""
+ response = self.wsgi_app(self.environ, self.start_response)
+ try:
+ for chunk in response:
+ # "The start_response callable must not actually transmit
+ # the response headers. Instead, it must store them for the
+ # server or gateway to transmit only after the first
+ # iteration of the application return value that yields
+ # a NON-EMPTY string, or upon the application's first
+ # invocation of the write() callable." (PEP 333)
+ if chunk:
+ self.write(chunk)
+ finally:
+ if hasattr(response, "close"):
+ response.close()
+ if (self.ready and not self.sent_headers
+ and not self.connection.server.interrupt):
+ self.sent_headers = True
+ self.send_headers()
+ if self.chunked_write:
+ self.sendall("0\r\n\r\n")
+
+ def simple_response(self, status, msg=""):
+ """Write a simple response back to the client."""
+ status = str(status)
+ buf = ["%s %s\r\n" % (self.connection.server.protocol, status),
+ "Content-Length: %s\r\n" % len(msg)]
+
+ if status[:3] == "413" and self.response_protocol == 'HTTP/1.1':
+ # Request Entity Too Large
+ self.close_connection = True
+ buf.append("Connection: close\r\n")
+
+ buf.append("\r\n")
+ if msg:
+ buf.append(msg)
+ self.sendall("".join(buf))
+
+ def start_response(self, status, headers, exc_info = None):
+ """WSGI callable to begin the HTTP response."""
+ if self.started_response:
+ if not exc_info:
+ raise AssertionError("WSGI start_response called a second "
+ "time with no exc_info.")
+ else:
+ try:
+ raise exc_info[0], exc_info[1], exc_info[2]
+ finally:
+ exc_info = None
+ self.started_response = True
+ self.status = status
+ self.outheaders.extend(headers)
+ return self.write
+
+ def write(self, chunk):
+ """WSGI callable to write unbuffered data to the client.
+
+ This method is also used internally by start_response (to write
+ data from the iterable returned by the WSGI application).
+ """
+ if not self.started_response:
+ raise AssertionError("WSGI write called before start_response.")
+
+ if not self.sent_headers:
+ self.sent_headers = True
+ self.send_headers()
+
+ if self.chunked_write and chunk:
+ buf = [hex(len(chunk))[2:], "\r\n", chunk, "\r\n"]
+ self.sendall("".join(buf))
+ else:
+ self.sendall(chunk)
+
+ def send_headers(self):
+ """Assert, process, and send the HTTP response message-headers."""
+ hkeys = [key.lower() for key, value in self.outheaders]
+ status = int(self.status[:3])
+
+ if status == 413:
+ # Request Entity Too Large. Close conn to avoid garbage.
+ self.close_connection = True
+ elif "content-length" not in hkeys:
+ # "All 1xx (informational), 204 (no content),
+ # and 304 (not modified) responses MUST NOT
+ # include a message-body." So no point chunking.
+ if status < 200 or status in (204, 205, 304):
+ pass
+ else:
+ if self.response_protocol == 'HTTP/1.1':
+ # Use the chunked transfer-coding
+ self.chunked_write = True
+ self.outheaders.append(("Transfer-Encoding", "chunked"))
+ else:
+ # Closing the conn is the only way to determine len.
+ self.close_connection = True
+
+ if "connection" not in hkeys:
+ if self.response_protocol == 'HTTP/1.1':
+ if self.close_connection:
+ self.outheaders.append(("Connection", "close"))
+ else:
+ if not self.close_connection:
+ self.outheaders.append(("Connection", "Keep-Alive"))
+
+ if "date" not in hkeys:
+ self.outheaders.append(("Date", rfc822.formatdate()))
+
+ server = self.connection.server
+
+ if "server" not in hkeys:
+ self.outheaders.append(("Server", server.version))
+
+ buf = [server.protocol, " ", self.status, "\r\n"]
+ try:
+ buf += [k + ": " + v + "\r\n" for k, v in self.outheaders]
+ except TypeError:
+ if not isinstance(k, str):
+ raise TypeError("WSGI response header key %r is not a string.")
+ if not isinstance(v, str):
+ raise TypeError("WSGI response header value %r is not a string.")
+ else:
+ raise
+ buf.append("\r\n")
+ self.sendall("".join(buf))
+
+
+class NoSSLError(Exception):
+ """Exception raised when a client speaks HTTP to an HTTPS socket."""
+ pass
+
+
+def _ssl_wrap_method(method, is_reader=False):
+ """Wrap the given method with SSL error-trapping.
+
+ is_reader: if False (the default), EOF errors will be raised.
+ If True, EOF errors will return "" (to emulate normal sockets).
+ """
+ def ssl_method_wrapper(self, *args, **kwargs):
+## print (id(self), method, args, kwargs)
+ start = time.time()
+ while True:
+ try:
+ return method(self, *args, **kwargs)
+ except (SSL.WantReadError, SSL.WantWriteError):
+ # Sleep and try again. This is dangerous, because it means
+ # the rest of the stack has no way of differentiating
+ # between a "new handshake" error and "client dropped".
+ # Note this isn't an endless loop: there's a timeout below.
+ time.sleep(self.ssl_retry)
+ except SSL.SysCallError, e:
+ if is_reader and e.args == (-1, 'Unexpected EOF'):
+ return ""
+
+ errno = e.args[0]
+ if is_reader and errno in socket_errors_to_ignore:
+ return ""
+ raise socket.error(errno)
+ except SSL.Error, e:
+ if is_reader and e.args == (-1, 'Unexpected EOF'):
+ return ""
+
+ thirdarg = None
+ try:
+ thirdarg = e.args[0][0][2]
+ except IndexError:
+ pass
+
+ if is_reader and thirdarg == 'ssl handshake failure':
+ return ""
+ if thirdarg == 'http request':
+ # The client is talking HTTP to an HTTPS server.
+ raise NoSSLError()
+ raise
+ if time.time() - start > self.ssl_timeout:
+ raise socket.timeout("timed out")
+ return ssl_method_wrapper
+
+class SSL_fileobject(socket._fileobject):
+ """Faux file object attached to a socket object."""
+
+ ssl_timeout = 3
+ ssl_retry = .01
+
+ close = _ssl_wrap_method(socket._fileobject.close)
+ flush = _ssl_wrap_method(socket._fileobject.flush)
+ write = _ssl_wrap_method(socket._fileobject.write)
+ writelines = _ssl_wrap_method(socket._fileobject.writelines)
+ read = _ssl_wrap_method(socket._fileobject.read, is_reader=True)
+ readline = _ssl_wrap_method(socket._fileobject.readline, is_reader=True)
+ readlines = _ssl_wrap_method(socket._fileobject.readlines, is_reader=True)
+
+
+class HTTPConnection(object):
+ """An HTTP connection (active socket).
+
+ socket: the raw socket object (usually TCP) for this connection.
+ addr: the "bind address" for the remote end of the socket.
+ For IP sockets, this is a tuple of (REMOTE_ADDR, REMOTE_PORT).
+ For UNIX domain sockets, this will be a string.
+ server: the HTTP Server for this Connection. Usually, the server
+ object possesses a passive (server) socket which spawns multiple,
+ active (client) sockets, one for each connection.
+
+ environ: a WSGI environ template. This will be copied for each request.
+ rfile: a fileobject for reading from the socket.
+ sendall: a function for writing (+ flush) to the socket.
+ """
+
+ rbufsize = -1
+ RequestHandlerClass = HTTPRequest
+ environ = {"wsgi.version": (1, 0),
+ "wsgi.url_scheme": "http",
+ "wsgi.multithread": True,
+ "wsgi.multiprocess": False,
+ "wsgi.run_once": False,
+ "wsgi.errors": sys.stderr,
+ }
+
+ def __init__(self, sock, addr, server):
+ self.socket = sock
+ self.addr = addr
+ self.server = server
+
+ # Copy the class environ into self.
+ self.environ = self.environ.copy()
+
+ if SSL and isinstance(sock, SSL.ConnectionType):
+ timeout = sock.gettimeout()
+ self.rfile = SSL_fileobject(sock, "r", self.rbufsize)
+ self.rfile.ssl_timeout = timeout
+ self.sendall = _ssl_wrap_method(sock.sendall)
+ self.environ["wsgi.url_scheme"] = "https"
+ self.environ["HTTPS"] = "on"
+ sslenv = getattr(server, "ssl_environ", None)
+ if sslenv:
+ self.environ.update(sslenv)
+ else:
+ self.rfile = sock.makefile("rb", self.rbufsize)
+ self.sendall = sock.sendall
+
+ self.environ.update({"wsgi.input": self.rfile,
+ "SERVER_NAME": self.server.server_name,
+ })
+
+ if isinstance(self.server.bind_addr, basestring):
+ # AF_UNIX. This isn't really allowed by WSGI, which doesn't
+ # address unix domain sockets. But it's better than nothing.
+ self.environ["SERVER_PORT"] = ""
+ else:
+ self.environ["SERVER_PORT"] = str(self.server.bind_addr[1])
+ # optional values
+ # Until we do DNS lookups, omit REMOTE_HOST
+ self.environ["REMOTE_ADDR"] = self.addr[0]
+ self.environ["REMOTE_PORT"] = str(self.addr[1])
+
+ def communicate(self):
+ """Read each request and respond appropriately."""
+ try:
+ while True:
+ # (re)set req to None so that if something goes wrong in
+ # the RequestHandlerClass constructor, the error doesn't
+ # get written to the previous request.
+ req = None
+ req = self.RequestHandlerClass(self)
+ # This order of operations should guarantee correct pipelining.
+ req.parse_request()
+ if not req.ready:
+ return
+ req.respond()
+ if req.close_connection:
+ return
+ except socket.error, e:
+ errno = e.args[0]
+ if errno not in socket_errors_to_ignore:
+ if req:
+ req.simple_response("500 Internal Server Error",
+ format_exc())
+ return
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except NoSSLError:
+ # Unwrap our sendall
+ req.sendall = self.socket._sock.sendall
+ req.simple_response("400 Bad Request",
+ "The client sent a plain HTTP request, but "
+ "this server only speaks HTTPS on this port.")
+ except:
+ if req:
+ req.simple_response("500 Internal Server Error", format_exc())
+
+ def close(self):
+ """Close the socket underlying this connection."""
+ self.rfile.close()
+ self.socket.close()
+
+
+def format_exc(limit=None):
+ """Like print_exc() but return a string. Backport for Python 2.3."""
+ try:
+ etype, value, tb = sys.exc_info()
+ return ''.join(traceback.format_exception(etype, value, tb, limit))
+ finally:
+ etype = value = tb = None
+
+
+_SHUTDOWNREQUEST = None
+
+class WorkerThread(threading.Thread):
+ """Thread which continuously polls a Queue for Connection objects.
+
+ server: the HTTP Server which spawned this thread, and which owns the
+ Queue and is placing active connections into it.
+ ready: a simple flag for the calling server to know when this thread
+ has begun polling the Queue.
+
+ Due to the timing issues of polling a Queue, a WorkerThread does not
+ check its own 'ready' flag after it has started. To stop the thread,
+ it is necessary to stick a _SHUTDOWNREQUEST object onto the Queue
+ (one for each running WorkerThread).
+ """
+
+ def __init__(self, server):
+ self.ready = False
+ self.server = server
+ threading.Thread.__init__(self)
+
+ def run(self):
+ try:
+ self.ready = True
+ while True:
+ conn = self.server.requests.get()
+ if conn is _SHUTDOWNREQUEST:
+ return
+
+ try:
+ conn.communicate()
+ finally:
+ conn.close()
+ except (KeyboardInterrupt, SystemExit), exc:
+ self.server.interrupt = exc
+
+
+class SSLConnection:
+ """A thread-safe wrapper for an SSL.Connection.
+
+ *args: the arguments to create the wrapped SSL.Connection(*args).
+ """
+
+ def __init__(self, *args):
+ self._ssl_conn = SSL.Connection(*args)
+ self._lock = threading.RLock()
+
+ for f in ('get_context', 'pending', 'send', 'write', 'recv', 'read',
+ 'renegotiate', 'bind', 'listen', 'connect', 'accept',
+ 'setblocking', 'fileno', 'shutdown', 'close', 'get_cipher_list',
+ 'getpeername', 'getsockname', 'getsockopt', 'setsockopt',
+ 'makefile', 'get_app_data', 'set_app_data', 'state_string',
+ 'sock_shutdown', 'get_peer_certificate', 'want_read',
+ 'want_write', 'set_connect_state', 'set_accept_state',
+ 'connect_ex', 'sendall', 'settimeout'):
+ exec """def %s(self, *args):
+ self._lock.acquire()
+ try:
+ return self._ssl_conn.%s(*args)
+ finally:
+ self._lock.release()
+""" % (f, f)
+
+
+class CherryPyWSGIServer(object):
+ """An HTTP server for WSGI.
+
+ bind_addr: a (host, port) tuple if TCP sockets are desired;
+ for UNIX sockets, supply the filename as a string.
+ wsgi_app: the WSGI 'application callable'; multiple WSGI applications
+ may be passed as (script_name, callable) pairs.
+ numthreads: the number of worker threads to create (default 10).
+ server_name: the string to set for WSGI's SERVER_NAME environ entry.
+ Defaults to socket.gethostname().
+ max: the maximum number of queued requests (defaults to -1 = no limit).
+ request_queue_size: the 'backlog' argument to socket.listen();
+ specifies the maximum number of queued connections (default 5).
+ timeout: the timeout in seconds for accepted connections (default 10).
+
+ protocol: the version string to write in the Status-Line of all
+ HTTP responses. For example, "HTTP/1.1" (the default). This
+ also limits the supported features used in the response.
+
+
+ SSL/HTTPS
+ ---------
+ The OpenSSL module must be importable for SSL functionality.
+ You can obtain it from http://pyopenssl.sourceforge.net/
+
+ ssl_certificate: the filename of the server SSL certificate.
+ ssl_privatekey: the filename of the server's private key file.
+
+ If either of these is None (both are None by default), this server
+ will not use SSL. If both are given and are valid, they will be read
+ on server start and used in the SSL context for the listening socket.
+ """
+
+ protocol = "HTTP/1.1"
+ version = "CherryPy/3.0.3"
+ ready = False
+ _interrupt = None
+ ConnectionClass = HTTPConnection
+
+ # Paths to certificate and private key files
+ ssl_certificate = None
+ ssl_private_key = None
+
+ def __init__(self, bind_addr, wsgi_app, numthreads=10, server_name=None,
+ max=-1, request_queue_size=5, timeout=10):
+ self.requests = Queue.Queue(max)
+
+ if callable(wsgi_app):
+ # We've been handed a single wsgi_app, in CP-2.1 style.
+ # Assume it's mounted at "".
+ self.mount_points = [("", wsgi_app)]
+ else:
+ # We've been handed a list of (mount_point, wsgi_app) tuples,
+ # so that the server can call different wsgi_apps, and also
+ # correctly set SCRIPT_NAME.
+ self.mount_points = wsgi_app
+ self.mount_points.sort()
+ self.mount_points.reverse()
+
+ self.bind_addr = bind_addr
+ self.numthreads = numthreads or 1
+ if not server_name:
+ server_name = socket.gethostname()
+ self.server_name = server_name
+ self.request_queue_size = request_queue_size
+ self._workerThreads = []
+
+ self.timeout = timeout
+
+ def start(self):
+ """Run the server forever."""
+ # We don't have to trap KeyboardInterrupt or SystemExit here,
+ # because cherrpy.server already does so, calling self.stop() for us.
+ # If you're using this server with another framework, you should
+ # trap those exceptions in whatever code block calls start().
+ self._interrupt = None
+
+ # Select the appropriate socket
+ if isinstance(self.bind_addr, basestring):
+ # AF_UNIX socket
+
+ # So we can reuse the socket...
+ try: os.unlink(self.bind_addr)
+ except: pass
+
+ # So everyone can access the socket...
+ try: os.chmod(self.bind_addr, 0777)
+ except: pass
+
+ info = [(socket.AF_UNIX, socket.SOCK_STREAM, 0, "", self.bind_addr)]
+ else:
+ # AF_INET or AF_INET6 socket
+ # Get the correct address family for our host (allows IPv6 addresses)
+ host, port = self.bind_addr
+ flags = 0
+ if host == '':
+ # Despite the socket module docs, using '' does not
+ # allow AI_PASSIVE to work. Passing None instead
+ # returns '0.0.0.0' like we want. In other words:
+ # host AI_PASSIVE result
+ # '' Y 192.168.x.y
+ # '' N 192.168.x.y
+ # None Y 0.0.0.0
+ # None N 127.0.0.1
+ host = None
+ flags = socket.AI_PASSIVE
+ try:
+ info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
+ socket.SOCK_STREAM, 0, flags)
+ except socket.gaierror:
+ # Probably a DNS issue. Assume IPv4.
+ info = [(socket.AF_INET, socket.SOCK_STREAM, 0, "", self.bind_addr)]
+
+ self.socket = None
+ msg = "No socket could be created"
+ for res in info:
+ af, socktype, proto, canonname, sa = res
+ try:
+ self.bind(af, socktype, proto)
+ except socket.error, msg:
+ if self.socket:
+ self.socket.close()
+ self.socket = None
+ continue
+ break
+ if not self.socket:
+ raise socket.error, msg
+
+ # Timeout so KeyboardInterrupt can be caught on Win32
+ self.socket.settimeout(1)
+ self.socket.listen(self.request_queue_size)
+
+ # Create worker threads
+ for i in xrange(self.numthreads):
+ self._workerThreads.append(WorkerThread(self))
+ for worker in self._workerThreads:
+ worker.setName("CP WSGIServer " + worker.getName())
+ worker.start()
+ for worker in self._workerThreads:
+ while not worker.ready:
+ time.sleep(.1)
+
+ self.ready = True
+ while self.ready:
+ self.tick()
+ if self.interrupt:
+ while self.interrupt is True:
+ # Wait for self.stop() to complete. See _set_interrupt.
+ time.sleep(0.1)
+ raise self.interrupt
+
+ def bind(self, family, type, proto=0):
+ """Create (or recreate) the actual socket object."""
+ self.socket = socket.socket(family, type, proto)
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+## self.socket.setsockopt(socket.SOL_SOCKET, socket.TCP_NODELAY, 1)
+ if self.ssl_certificate and self.ssl_private_key:
+ if SSL is None:
+ raise ImportError("You must install pyOpenSSL to use HTTPS.")
+
+ # See http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/442473
+ ctx = SSL.Context(SSL.SSLv23_METHOD)
+ ctx.use_privatekey_file(self.ssl_private_key)
+ ctx.use_certificate_file(self.ssl_certificate)
+ self.socket = SSLConnection(ctx, self.socket)
+ self.populate_ssl_environ()
+ self.socket.bind(self.bind_addr)
+
+ def tick(self):
+ """Accept a new connection and put it on the Queue."""
+ try:
+ s, addr = self.socket.accept()
+ if not self.ready:
+ return
+ if hasattr(s, 'settimeout'):
+ s.settimeout(self.timeout)
+ conn = self.ConnectionClass(s, addr, self)
+ self.requests.put(conn)
+ except socket.timeout:
+ # The only reason for the timeout in start() is so we can
+ # notice keyboard interrupts on Win32, which don't interrupt
+ # accept() by default
+ return
+ except socket.error, x:
+ msg = x.args[1]
+ if msg in ("Bad file descriptor", "Socket operation on non-socket"):
+ # Our socket was closed.
+ return
+ if msg == "Resource temporarily unavailable":
+ # Just try again. See http://www.cherrypy.org/ticket/479.
+ return
+ raise
+
+ def _get_interrupt(self):
+ return self._interrupt
+ def _set_interrupt(self, interrupt):
+ self._interrupt = True
+ self.stop()
+ self._interrupt = interrupt
+ interrupt = property(_get_interrupt, _set_interrupt,
+ doc="Set this to an Exception instance to "
+ "interrupt the server.")
+
+ def stop(self):
+ """Gracefully shutdown a server that is serving forever."""
+ self.ready = False
+
+ sock = getattr(self, "socket", None)
+ if sock:
+ if not isinstance(self.bind_addr, basestring):
+ # Touch our own socket to make accept() return immediately.
+ try:
+ host, port = sock.getsockname()[:2]
+ except socket.error, x:
+ if x.args[1] != "Bad file descriptor":
+ raise
+ else:
+ # Note that we're explicitly NOT using AI_PASSIVE,
+ # here, because we want an actual IP to touch.
+ # localhost won't work if we've bound to a public IP,
+ # but it would if we bound to INADDR_ANY via host = ''.
+ for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC,
+ socket.SOCK_STREAM):
+ af, socktype, proto, canonname, sa = res
+ s = None
+ try:
+ s = socket.socket(af, socktype, proto)
+ # See http://groups.google.com/group/cherrypy-users/
+ # browse_frm/thread/bbfe5eb39c904fe0
+ s.settimeout(1.0)
+ s.connect((host, port))
+ s.close()
+ except socket.error:
+ if s:
+ s.close()
+ if hasattr(sock, "close"):
+ sock.close()
+ self.socket = None
+
+ # Must shut down threads here so the code that calls
+ # this method can know when all threads are stopped.
+ for worker in self._workerThreads:
+ self.requests.put(_SHUTDOWNREQUEST)
+
+ # Don't join currentThread (when stop is called inside a request).
+ current = threading.currentThread()
+ while self._workerThreads:
+ worker = self._workerThreads.pop()
+ if worker is not current and worker.isAlive():
+ try:
+ worker.join()
+ except AssertionError:
+ pass
+
+ def populate_ssl_environ(self):
+ """Create WSGI environ entries to be merged into each request."""
+ cert = open(self.ssl_certificate).read()
+ cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
+ self.ssl_environ = {
+ # pyOpenSSL doesn't provide access to any of these AFAICT
+## 'SSL_PROTOCOL': 'SSLv2',
+## SSL_CIPHER string The cipher specification name
+## SSL_VERSION_INTERFACE string The mod_ssl program version
+## SSL_VERSION_LIBRARY string The OpenSSL program version
+ }
+
+ # Server certificate attributes
+ self.ssl_environ.update({
+ 'SSL_SERVER_M_VERSION': cert.get_version(),
+ 'SSL_SERVER_M_SERIAL': cert.get_serial_number(),
+## 'SSL_SERVER_V_START': Validity of server's certificate (start time),
+## 'SSL_SERVER_V_END': Validity of server's certificate (end time),
+ })
+
+ for prefix, dn in [("I", cert.get_issuer()),
+ ("S", cert.get_subject())]:
+ # X509Name objects don't seem to have a way to get the
+ # complete DN string. Use str() and slice it instead,
+ # because str(dn) == "<X509Name object '/C=US/ST=...'>"
+ dnstr = str(dn)[18:-2]
+
+ wsgikey = 'SSL_SERVER_%s_DN' % prefix
+ self.ssl_environ[wsgikey] = dnstr
+
+ # The DN should be of the form: /k1=v1/k2=v2, but we must allow
+ # for any value to contain slashes itself (in a URL).
+ while dnstr:
+ pos = dnstr.rfind("=")
+ dnstr, value = dnstr[:pos], dnstr[pos + 1:]
+ pos = dnstr.rfind("/")
+ dnstr, key = dnstr[:pos], dnstr[pos + 1:]
+ if key and value:
+ wsgikey = 'SSL_SERVER_%s_DN_%s' % (prefix, key)
+ self.ssl_environ[wsgikey] = value
+
+
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2008-02-21 21:36:56 UTC (rev 1722)
+++ mgmt/notes/justin-todo.txt 2008-02-22 17:03:16 UTC (rev 1723)
@@ -18,6 +18,8 @@
Deferred
+ * Need to add cherrypy bsd license to binary dist?
+
* See if we can't avoid the app.add_parameter in Parameter; adding to
page somehow would make a lot more sense
16 years, 10 months
rhmessaging commits: r1722 - mgmt/cumin/python/wooly.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-02-21 16:36:56 -0500 (Thu, 21 Feb 2008)
New Revision: 1722
Modified:
mgmt/cumin/python/wooly/__init__.py
Log:
Move the path string accommodation to a better place
Modified: mgmt/cumin/python/wooly/__init__.py
===================================================================
--- mgmt/cumin/python/wooly/__init__.py 2008-02-21 21:24:33 UTC (rev 1721)
+++ mgmt/cumin/python/wooly/__init__.py 2008-02-21 21:36:56 UTC (rev 1722)
@@ -458,6 +458,8 @@
self.pages[page.name] = page
def get_page(self, name):
+ #print "Looking for", name, "in", self.pages
+
return self.pages.get(name, self.default_page)
def set_default_page(self, page):
@@ -629,9 +631,6 @@
return separator.join(vars)
def unmarshal(self, string):
- if string.startswith("/"):
- string = string[1:]
-
elems = string.split("?")
self.unmarshal_page(elems[0])
@@ -642,6 +641,9 @@
pass
def unmarshal_page(self, string):
+ if string.startswith("/"):
+ string = string[1:]
+
self.set_page(self.app.get_page(string))
def unmarshal_url_vars(self, string, separator=";"):
16 years, 10 months