Author: jfrederic.clere(a)jboss.com
Date: 2008-04-24 09:29:32 -0400 (Thu, 24 Apr 2008)
New Revision: 1560
Modified:
sandbox/httpd/src/native/mod_proxy_cluster/mod_proxy_cluster.c
Log:
Add a a thread per process to handle the reversed connection.
Create the server socket and the worker in the new thread.
Modified: sandbox/httpd/src/native/mod_proxy_cluster/mod_proxy_cluster.c
===================================================================
--- sandbox/httpd/src/native/mod_proxy_cluster/mod_proxy_cluster.c 2008-04-24 08:29:42 UTC
(rev 1559)
+++ sandbox/httpd/src/native/mod_proxy_cluster/mod_proxy_cluster.c 2008-04-24 13:29:32 UTC
(rev 1560)
@@ -65,7 +65,7 @@
* Create/Get the worker before using it
*/
static void create_worker(proxy_server_conf *conf, proxy_balancer *balancer,
- request_rec *r, proxy_worker **worker,
+ server_rec *server, proxy_worker **worker,
nodeinfo_t *node)
{
char url[6+64+7+4]; /* Type :// Host : Port */
@@ -85,7 +85,7 @@
/* creates it */
const char *err = ap_proxy_add_worker(worker, conf->pool, conf, url);
if (err) {
- ap_log_error(APLOG_MARK, APLOG_NOTICE|APLOG_NOERRNO, 0, r->server,
+ ap_log_error(APLOG_MARK, APLOG_NOTICE|APLOG_NOERRNO, 0, server,
"Created: worker for %s failed: %s", url, err);
return;
}
@@ -95,7 +95,7 @@
ptr = ptr + node->offset;
proxy_worker->s = (proxy_worker_stat *) ptr;
proxy_worker->id = node->mess.id;
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"Created: worker for %s", url);
/*
* XXX: The Shared datastatus may already contains a valid information
@@ -118,14 +118,14 @@
int port = atoi(node->mess.Port);
int i;
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"Using reversed worker for %s", url);
/* XXX: change 10 into a CONFIG received value */
for (i=0; i<10; i++) {
rv = apr_socket_create(&listener, APR_INET, SOCK_STREAM,
APR_PROTO_TCP, conf->pool);
if (rv != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"Unable to create server socket");
return;
}
@@ -133,38 +133,38 @@
rv = apr_sockaddr_info_get(&sockAddr, node->mess.Host,
APR_UNSPEC,
port + i, 0, conf->pool);
if (rv != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"Unable to get socket info");
apr_socket_close(listener);
return;
}
if ((rv = apr_socket_bind(listener, sockAddr)) != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"Unable to bind to socket %d", rv);
apr_socket_close(listener);
continue; /* try again with next port */
}
/* XXX: 1 need to be ajusted */
if ((rv = apr_socket_listen(listener, 1)) != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"Unable to listen to socket %d", rv);
apr_socket_close(listener);
continue; /* try again with next port */
}
break;
}
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"Using reversed worker for %s listener %d", url,
listener);
proxy_worker->opaque = listener;
} else {
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"Using normal worker for %s", url);
proxy_worker->opaque = NULL;
}
my_ap_proxy_add_worker_to_balancer(conf->pool, balancer, *worker);
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"Created: worker for %s %d (status): %d", url,
proxy_worker->id, proxy_worker->s->status);
/* XXX: it may be already filled... */
@@ -177,16 +177,13 @@
/*
* Create workers corresponding to newer nodes.
*/
-static void create_workers_node(request_rec *r)
+static void create_workers_node(proxy_server_conf *conf, apr_pool_t *pool, server_rec
*server)
{
- void *sconf = r->server->module_config;
- proxy_server_conf *conf = (proxy_server_conf *)
- ap_get_module_config(sconf, &proxy_module);
int *ids, num;
/* read the ident of the nodes */
- ids = apr_pcalloc(r->pool, sizeof(int) * node_storage->get_max_size_node());
+ ids = apr_pcalloc(pool, sizeof(int) * node_storage->get_max_size_node());
num = node_storage->get_ids_used_node(ids);
/* XXX: How to skip the balancer that aren't controled by mod_manager */
if (conf->workers->nelts<num) {
@@ -211,16 +208,16 @@
/* read the node and create the worker */
nodeinfo_t *node;
if (node_storage->read_node(ids[j], &node) != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
"create_workers_node can't read id %d",
ids[j]);
continue;
}
- proxy_balancer *balancer = ap_proxy_get_balancer(r->pool, conf,
- apr_pstrcat(r->pool, "balancer://", node->balancer,
NULL));
+ proxy_balancer *balancer = ap_proxy_get_balancer(pool, conf,
+ apr_pstrcat(pool, "balancer://", node->balancer,
NULL));
if (balancer) {
- create_worker(conf, balancer, r, &worker, node);
+ create_worker(conf, balancer, server, &worker, node);
} else {
- ap_log_error(APLOG_MARK, APLOG_NOTICE|APLOG_NOERRNO, 0, NULL,
+ ap_log_error(APLOG_MARK, APLOG_NOTICE|APLOG_NOERRNO, 0, server,
"create_workers_node: Can't find
balancer");
}
}
@@ -475,13 +472,16 @@
int max_lbset = 0;
int checking_standby;
int checked_standby;
+ void *sconf = r->server->module_config;
+ proxy_server_conf *conf = (proxy_server_conf *)
+ ap_get_module_config(sconf, &proxy_module);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
"proxy: Entering bytraffic for BALANCER (%s)",
balancer->name);
/* create workers for new nodes */
- create_workers_node(r);
+ create_workers_node(conf, r->pool, r->server);
/* First try to see if we have available candidate */
do {
@@ -841,7 +841,7 @@
proxy_worker *worker;
/* create the workers (that could be the first time) */
- create_workers_node(r);
+ create_workers_node(conf, r->pool, r->server);
/* search for the worker */
worker = (proxy_worker *)conf->workers->elts;
@@ -855,6 +855,8 @@
return 500;
}
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ "proxy_cluster_isup: (first) %d %d %d", worker->id,
worker->s->status, worker->s->lbfactor);
/* Try a ping/pong to check the node */
if (load > 0) {
/* Only try usuable nodes */
@@ -878,15 +880,55 @@
worker->s->status |= PROXY_WORKER_HOT_STANDBY;
}
else {
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ "proxy_cluster_isup: (AKE) %d %d %d", worker->id,
worker->s->status, worker->s->lbfactor);
worker->s->status &= ~PROXY_WORKER_IN_ERROR;
worker->s->status &= ~PROXY_WORKER_STOPPED;
worker->s->status &= ~PROXY_WORKER_DISABLED;
- worker->s->status &= PROXY_WORKER_HOT_STANDBY;
+ worker->s->status &= ~PROXY_WORKER_HOT_STANDBY;
worker->s->lbfactor = load;
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ "proxy_cluster_isup: (AKU) %d %d %d", worker->id,
worker->s->status, worker->s->lbfactor);
}
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ "proxy_cluster_isup: %d %d %d", worker->id,
worker->s->status, worker->s->lbfactor);
return 0;
}
+static void * APR_THREAD_FUNC proxy_cluster_watchdog_func(apr_thread_t *thd, void *data)
+{
+ apr_pool_t *pool;
+ server_rec *s = (server_rec *) data;
+ void *sconf = s->module_config;
+ proxy_server_conf *conf = (proxy_server_conf *)
+ ap_get_module_config(sconf, &proxy_module);
+
+ for (;;) {
+ apr_sleep(apr_time_make(0, 1000));
+ /* Create new workers if the shared memory changes */
+ apr_pool_create(&pool, conf->pool);
+ create_workers_node(conf, pool, s);
+ apr_pool_destroy(pool);
+ }
+ apr_thread_exit(thd, 0);
+ return NULL;
+}
+
+/*
+ * Create a thread per process to make maintenance task.
+ */
+static void proxy_cluster_child_init(apr_pool_t *p, server_rec *s)
+{
+ apr_status_t rv;
+ apr_thread_t *wdt;
+
+ rv = apr_thread_create(&wdt, NULL, proxy_cluster_watchdog_func, s, p);
+ if (rv != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_ERR|APLOG_NOERRNO, 0, s,
+ "proxy_cluster_child_init: apr_thread_create failed");
+ }
+}
+
static int proxy_cluster_post_config(apr_pool_t *p, apr_pool_t *plog,
apr_pool_t *ptemp, server_rec *s)
{
@@ -929,6 +971,9 @@
/* create the provider for the proxy logic */
ap_register_provider(p, PROXY_LBMETHOD, "cluster_bytraffic", "0",
&bytraffic);
+ /* create the "maintenance" thread */
+ ap_hook_child_init(proxy_cluster_child_init, NULL, NULL, APR_HOOK_MIDDLE);
+
}
static void *create_proxy_cluster_dir_config(apr_pool_t *p, char *dir)