Author: jfrederic.clere(a)jboss.com
Date: 2008-05-27 11:25:56 -0400 (Tue, 27 May 2008)
New Revision: 1622
Modified:
trunk/mod_cluster/native/include/node.h
trunk/mod_cluster/native/mod_manager/mod_manager.c
trunk/mod_cluster/native/mod_manager/node.c
trunk/mod_cluster/native/mod_proxy_cluster/mod_proxy_cluster.c
Log:
Add logic to remove the nodes.
Modified: trunk/mod_cluster/native/include/node.h
===================================================================
--- trunk/mod_cluster/native/include/node.h 2008-05-27 06:40:15 UTC (rev 1621)
+++ trunk/mod_cluster/native/include/node.h 2008-05-27 15:25:56 UTC (rev 1622)
@@ -76,7 +76,7 @@
/* config from jboss/tomcat */
nodemess_t mess;
/* filled by httpd */
- unsigned long updatetime; /* time of last received message */
+ apr_time_t updatetime; /* time of last received message */
int offset; /* offset to the proxy_worker_stat structure */
char stat[SIZEOFSCORE]; /* to store the status */
};
@@ -169,5 +169,18 @@
* read the max number of nodes in the shared table
*/
APR_DECLARE(int) (*get_max_size_node)();
+/**
+ * check the nodes for modifications.
+ * XXX: void *data is server_rec *s in fact.
+ */
+APR_DECLARE(apr_time_t) (*worker_nodes_need_update)(void *data, apr_pool_t *pool);
+/*
+ * mark that the worker node are now up to date.
+ */
+APR_DECLARE(int) (*worker_nodes_are_updated)(void *data);
+/*
+ * Remove the node from shared memory (free the slotmem)
+ */
+APR_DECLARE(int) (*remove_node)(nodeinfo_t *node);
};
#endif /*NODE_H*/
Modified: trunk/mod_cluster/native/mod_manager/mod_manager.c
===================================================================
--- trunk/mod_cluster/native/mod_manager/mod_manager.c 2008-05-27 06:40:15 UTC (rev 1621)
+++ trunk/mod_cluster/native/mod_manager/mod_manager.c 2008-05-27 15:25:56 UTC (rev 1622)
@@ -97,6 +97,10 @@
int maxnode;
/* max munber of host supported */
int maxhost;
+
+ /* last time the node update logic was called */
+ apr_time_t last_updated;
+
} mod_manager_config;
/*
@@ -114,11 +118,57 @@
{
return(get_max_size_node(nodestatsmem));
}
+static apr_status_t loc_remove_node(nodeinfo_t *node)
+{
+ return (remove_node(nodestatsmem, node));
+}
+
+/* Check is the nodes (in shared memory) were modified since last
+ * call to worker_nodes_are_updated().
+ * return codes:
+ * 0 : No update of the nodes since last time.
+ * x : Last time we changed something in the process.
+ */
+static apr_time_t loc_worker_nodes_need_update(void *data, apr_pool_t *pool)
+{
+ int size, i;
+ int *id;
+ server_rec *s = (server_rec *) data;
+ apr_time_t last = 0;
+ mod_manager_config *mconf = ap_get_module_config(s->module_config,
&manager_module);
+
+ size = get_max_size_node(nodestatsmem);
+ id = apr_palloc(pool, sizeof(int) * size);
+ size = get_ids_used_node(nodestatsmem, id);
+ for (i=0; i<size; i++) {
+ nodeinfo_t *ou;
+ get_node(nodestatsmem, &ou, id[i]);
+ if (ou->updatetime > last)
+ last = ou->updatetime;
+ }
+ if (last >= mconf->last_updated) {
+ if (mconf->last_updated == 0)
+ return(1); /* First time */
+ return(mconf->last_updated);
+ }
+
+ return (0);
+}
+/* Store the last update time in the proccess config */
+static int loc_worker_nodes_are_updated(void *data)
+{
+ server_rec *s = (server_rec *) data;
+ mod_manager_config *mconf = ap_get_module_config(s->module_config,
&manager_module);
+ mconf->last_updated = apr_time_now();
+}
static const struct node_storage_method node_storage =
{
loc_read_node,
loc_get_ids_used_node,
- loc_get_max_size_node
+ loc_get_max_size_node,
+ loc_worker_nodes_need_update,
+ loc_worker_nodes_are_updated,
+ loc_remove_node
};
/*
@@ -665,12 +715,14 @@
/* Read the node */
node = read_node(nodestatsmem, &nodeinfo);
if (node == NULL) {
+ if (status == REMOVE)
+ return NULL; /* Already done */
*errtype = TYPEMEM;
return MNODERD;
}
/* Process the * APP commands */
- if (strcmp(r->uri, "*") == 0) {
+ if (strcmp(r->uri, "*") == 0 || strcmp(r->uri, "/*") ==0)
{
return (process_node_cmd(r, status, errtype, node));
}
@@ -1054,6 +1106,7 @@
mconf->maxcontext = DEFMAXCONTEXT;
mconf->maxnode = DEFMAXNODE;
mconf->maxhost = DEFMAXHOST;
+ mconf->last_updated = 0;
return mconf;
}
@@ -1071,6 +1124,7 @@
mconf->basefilename = NULL;
mconf->maxcontext = DEFMAXCONTEXT;
mconf->maxnode = DEFMAXNODE;
+ mconf->last_updated = 0;
if (mconf2->basefilename)
mconf->basefilename = apr_pstrdup(p, mconf2->basefilename);
Modified: trunk/mod_cluster/native/mod_manager/node.c
===================================================================
--- trunk/mod_cluster/native/mod_manager/node.c 2008-05-27 06:40:15 UTC (rev 1621)
+++ trunk/mod_cluster/native/mod_manager/node.c 2008-05-27 15:25:56 UTC (rev 1622)
@@ -94,7 +94,7 @@
*/
memcpy(ou, in, sizeof(nodemess_t));
ou->mess.id = id;
- ou->updatetime = (unsigned long) apr_time_sec(apr_time_now());
+ ou->updatetime = apr_time_now();
ou->offset = sizeof(nodemess_t) + sizeof(unsigned long) + sizeof(int);
*data = ou;
return APR_SUCCESS;
@@ -122,7 +122,7 @@
memcpy(ou, node, sizeof(nodeinfo_t));
ou->mess.id = ident;
*id = ident;
- ou->updatetime = (unsigned long) apr_time_sec(apr_time_now());
+ ou->updatetime = apr_time_now();
/* set of offset to the proxy_worker_stat */
ou->offset = sizeof(nodemess_t) + sizeof(unsigned long) + sizeof(int);
Modified: trunk/mod_cluster/native/mod_proxy_cluster/mod_proxy_cluster.c
===================================================================
--- trunk/mod_cluster/native/mod_proxy_cluster/mod_proxy_cluster.c 2008-05-27 06:40:15 UTC
(rev 1621)
+++ trunk/mod_cluster/native/mod_proxy_cluster/mod_proxy_cluster.c 2008-05-27 15:25:56 UTC
(rev 1622)
@@ -119,77 +119,121 @@
}
}
/*
- * Create workers corresponding to newer nodes.
+ * Add a node to the worker conf
*/
-static void create_workers_node(proxy_server_conf *conf, apr_pool_t *pool, server_rec
*server)
+static int add_workers_node(nodeinfo_t *node, proxy_server_conf *conf, apr_pool_t *pool,
server_rec *server)
{
- int *ids, num;
+ char *name = apr_pstrcat(pool, "cluster://", node->mess.balancer,
NULL);
+ proxy_balancer *balancer = ap_proxy_get_balancer(pool, conf, name);
+ proxy_worker *worker = NULL;
+ if (!balancer) {
+ /* Create one */
+ ap_log_error(APLOG_MARK, APLOG_NOTICE|APLOG_NOERRNO, 0, server,
+ "add_workers_node: Create balancer %s", name);
+ balancer = apr_array_push(conf->balancers);
+ memset(balancer, 0, sizeof(proxy_balancer));
+ balancer->name = apr_pstrdup(conf->pool, name);
+ balancer->lbmethod = ap_lookup_provider(PROXY_LBMETHOD,
"cluster_bytraffic", "0");
+ balancer->workers = apr_array_make(conf->pool, 5, sizeof(proxy_worker));
+ /* XXX Is this a right place to create mutex */
+#if APR_HAS_THREADS
+ if (apr_thread_mutex_create(&(balancer->mutex),
+ APR_THREAD_MUTEX_DEFAULT, conf->pool) != APR_SUCCESS) {
+ /* XXX: Do we need to log something here */
+ ap_log_error(APLOG_MARK, APLOG_NOTICE|APLOG_NOERRNO, 0, server,
+ "add_workers_node: Can't create lock for
balancer");
+ }
+#endif
+ }
+ if (balancer) {
+ create_worker(conf, balancer, server, &worker, node);
+ } else {
+ ap_log_error(APLOG_MARK, APLOG_NOTICE|APLOG_NOERRNO, 0, server,
+ "add_workers_node: Can't find balancer");
+ }
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
+ "add_workers_node done");
+}
+/*
+ * Add a node to the worker conf
+ */
+static int remove_workers_node(nodeinfo_t *node, proxy_server_conf *conf, apr_pool_t
*pool, server_rec *server)
+{
+ int i;
+ proxy_worker *worker = (proxy_worker *)conf->workers->elts;
+ for (i = 0; i < conf->workers->nelts; i++) {
+ if (worker->id == node->mess.id)
+ break;
+ worker++;
+ }
+ if (i == conf->workers->nelts) {
+ node_storage->remove_node(node);
+ return 0; /* Done ? */
+ }
+ /* prevent other threads using it */
+ worker->s->status = worker->s->status |= PROXY_WORKER_IN_ERROR;
+
+ /* apr_reslist_acquired_count */
+ i = 0;
+ if (worker->cp->res)
+ i = apr_reslist_acquired_count(worker->cp->res);
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
+ "remove_workers_node %d", i);
+ if (i == 0) {
+ /* No connection in use: clean the worker */
+ apr_pool_destroy(worker->cp->pool);
+ worker->cp->pool = NULL;
+
+ /* XXX: How to remove the worker from the apr_array of the balancer */
+
+ /* remove the node from the shared memory */
+ node_storage->remove_node(node);
+ return (0);
+ } else
+ return (1); /* We should retry later */
+}
+/*
+ * Create/Remove workers corresponding to updated nodes.
+ */
+static void update_workers_node(proxy_server_conf *conf, apr_pool_t *pool, server_rec
*server)
+{
+ int *id, size, i;
+ apr_time_t last;
+ int notok = 0;
+
+ /* Check if we have to do something */
+ last = node_storage->worker_nodes_need_update(server, pool);
+
+ /* nodes_need_update will return 1 if last_updated is zero: first time we are called
*/
+ if (last == 0)
+ return;
+
/* read the ident of the nodes */
-
- ids = apr_pcalloc(pool, sizeof(int) * node_storage->get_max_size_node());
- num = node_storage->get_ids_used_node(ids);
+ id = apr_pcalloc(pool, sizeof(int) * node_storage->get_max_size_node());
+ size = node_storage->get_ids_used_node(id);
+
/* XXX: How to skip the balancer that aren't controled by mod_manager */
- if (conf->workers->nelts<num) {
- /* There are more workers in shared area than in the local tables */
- proxy_worker *worker;
- int i, j;
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
- "create_workers_node starting");
- worker = (proxy_worker *)conf->workers->elts;
- for (i = 0; i < conf->workers->nelts; i++) {
- /* check the id against the table of used ids */
- for (j=0; j<num; j++) {
- if (worker->id == ids[j]) {
- ids[j] = 0;
- break;
- }
- }
- worker++;
- }
- /* create the workers */
- for (j=0; j<num; j++) {
- if (ids[j]) {
- /* read the node and create the worker */
- nodeinfo_t *node;
- if (node_storage->read_node(ids[j], &node) != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
- "create_workers_node can't read id %d",
ids[j]);
- continue;
- }
- char *name = apr_pstrcat(pool, "cluster://",
node->mess.balancer, NULL);
- proxy_balancer *balancer = ap_proxy_get_balancer(pool, conf, name);
- if (!balancer) {
- /* Create one */
- ap_log_error(APLOG_MARK, APLOG_NOTICE|APLOG_NOERRNO, 0, server,
- "create_workers_node: Create balancer %s",
name);
- balancer = apr_array_push(conf->balancers);
- memset(balancer, 0, sizeof(proxy_balancer));
- balancer->name = apr_pstrdup(conf->pool, name);
- balancer->lbmethod = ap_lookup_provider(PROXY_LBMETHOD,
"cluster_bytraffic", "0");
- balancer->workers = apr_array_make(conf->pool, 5,
sizeof(proxy_worker));
- /* XXX Is this a right place to create mutex */
-#if APR_HAS_THREADS
- if (apr_thread_mutex_create(&(balancer->mutex),
- APR_THREAD_MUTEX_DEFAULT, conf->pool) != APR_SUCCESS)
{
- /* XXX: Do we need to log something here */
- ap_log_error(APLOG_MARK, APLOG_NOTICE|APLOG_NOERRNO, 0, server,
- "create_workers_node: Can't create lock
for balancer");
- }
-#endif
- }
- if (balancer) {
- create_worker(conf, balancer, server, &worker, node);
- } else {
- ap_log_error(APLOG_MARK, APLOG_NOTICE|APLOG_NOERRNO, 0, server,
- "create_workers_node: Can't find
balancer");
- }
- }
- }
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
- "create_workers_node done");
- }
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
+ "update_workers_node starting");
+
+ /* Only process the nodes that have been updated since our last update */
+ for (i=0; i<size; i++) {
+ nodeinfo_t *ou;
+ node_storage->read_node(id[i], &ou);
+ if (ou->updatetime >= last) {
+ /* The node has changed */
+ if (ou->mess.remove)
+ notok = notok + remove_workers_node(ou, conf, pool, server);
+ else
+ add_workers_node(ou, conf, pool, server);
+ }
+ }
+ if (! notok)
+ node_storage->worker_nodes_are_updated(server);
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, server,
+ "update_workers_node done");
}
/* reslist constructor */
@@ -576,7 +620,7 @@
balancer->name);
/* create workers for new nodes */
- create_workers_node(conf, r->pool, r->server);
+ update_workers_node(conf, r->pool, r->server);
/* First try to see if we have available candidate */
do {
@@ -743,7 +787,7 @@
proxy_worker *worker;
/* create the workers (that could be the first time) */
- create_workers_node(conf, r->pool, r->server);
+ update_workers_node(conf, r->pool, r->server);
/* search for the worker */
worker = (proxy_worker *)conf->workers->elts;
@@ -808,7 +852,7 @@
apr_sleep(apr_time_make(0, 1000));
/* Create new workers if the shared memory changes */
apr_pool_create(&pool, conf->pool);
- create_workers_node(conf, pool, s);
+ update_workers_node(conf, pool, s);
apr_pool_destroy(pool);
}
apr_thread_exit(thd, 0);
@@ -866,7 +910,7 @@
*/
static int proxy_cluster_trans(request_rec *r)
{
- ap_log_error(APLOG_MARK, APLOG_NOERRNO|APLOG_EMERG, 0, r->server,
+ ap_log_error(APLOG_MARK, APLOG_NOERRNO|APLOG_DEBUG, 0, r->server,
"proxy_cluster_trans for %d %s %s uri: %s",
r->proxyreq, r->filename, r->handler, r->uri);
@@ -875,7 +919,7 @@
r->filename = apr_pstrcat(r->pool, "proxy:cluster://", balancer,
r->uri, NULL);
r->handler = "proxy-server";
r->proxyreq = PROXYREQ_REVERSE;
- ap_log_error(APLOG_MARK, APLOG_NOERRNO|APLOG_EMERG, 0, r->server,
+ ap_log_error(APLOG_MARK, APLOG_NOERRNO|APLOG_DEBUG, 0, r->server,
"proxy_cluster_trans using %s uri: %s",
balancer, r->filename);
return OK; /* Mod_proxy will process it */