Author: jfrederic.clere(a)jboss.com
Date: 2008-04-24 04:29:42 -0400 (Thu, 24 Apr 2008)
New Revision: 1559
Modified:
sandbox/httpd/src/native/mod_proxy_cluster/mod_proxy_cluster.c
Log:
First try to have a reversed logic to work.
Modified: sandbox/httpd/src/native/mod_proxy_cluster/mod_proxy_cluster.c
===================================================================
--- sandbox/httpd/src/native/mod_proxy_cluster/mod_proxy_cluster.c 2008-04-23 11:43:57 UTC
(rev 1558)
+++ sandbox/httpd/src/native/mod_proxy_cluster/mod_proxy_cluster.c 2008-04-24 08:29:42 UTC
(rev 1559)
@@ -47,6 +47,21 @@
static struct context_storage_method *context_storage = NULL;
/*
+ * XXX: Should use the proxy_util.c one?.
+ */
+static void my_ap_proxy_add_worker_to_balancer(apr_pool_t *pool, proxy_balancer
*balancer,
+ proxy_worker *worker)
+{
+ proxy_worker *runtime;
+
+ runtime = apr_array_push(balancer->workers);
+ memcpy(runtime, worker, sizeof(proxy_worker));
+ /* Increase the total runtime count */
+ proxy_lb_workers++;
+
+}
+
+/*
* Create/Get the worker before using it
*/
static void create_worker(proxy_server_conf *conf, proxy_balancer *balancer,
@@ -82,10 +97,77 @@
proxy_worker->id = node->mess.id;
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
"Created: worker for %s", url);
- proxy_worker->s->status = PROXY_WORKER_INITIALIZED;
+ /*
+ * XXX: The Shared datastatus may already contains a valid information
+ */
+ if (!proxy_worker->s->status)
+ proxy_worker->s->status = PROXY_WORKER_INITIALIZED;
- ap_proxy_add_worker_to_balancer(conf->pool, balancer, *worker);
+ /*
+ * Process the reversed logic:
+ * proxy_worker->opaque will contain the socket (server)
+ * is reversed connection and a NULL otherwise.
+ * If something more need to be allocated it must use the
+ * conf->pool to persist through requests.
+ */
+ if (node->mess.reversed) {
+ /* Create a "server_socket" for the reversed connection */
+ apr_sockaddr_t *sockAddr;
+ apr_socket_t *listener;
+ apr_status_t rv;
+ int port = atoi(node->mess.Port);
+ int i;
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ "Using reversed worker for %s", url);
+ /* XXX: change 10 into a CONFIG received value */
+ for (i=0; i<10; i++) {
+ rv = apr_socket_create(&listener, APR_INET, SOCK_STREAM,
+ APR_PROTO_TCP, conf->pool);
+ if (rv != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ "Unable to create server socket");
+ return;
+ }
+
+ rv = apr_sockaddr_info_get(&sockAddr, node->mess.Host,
APR_UNSPEC,
+ port + i, 0, conf->pool);
+ if (rv != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ "Unable to get socket info");
+ apr_socket_close(listener);
+ return;
+ }
+
+ if ((rv = apr_socket_bind(listener, sockAddr)) != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ "Unable to bind to socket %d", rv);
+ apr_socket_close(listener);
+ continue; /* try again with next port */
+ }
+ /* XXX: 1 need to be ajusted */
+ if ((rv = apr_socket_listen(listener, 1)) != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ "Unable to listen to socket %d", rv);
+ apr_socket_close(listener);
+ continue; /* try again with next port */
+ }
+ break;
+ }
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ "Using reversed worker for %s listener %d", url,
listener);
+ proxy_worker->opaque = listener;
+ } else {
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ "Using normal worker for %s", url);
+ proxy_worker->opaque = NULL;
+ }
+
+ my_ap_proxy_add_worker_to_balancer(conf->pool, balancer, *worker);
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ "Created: worker for %s %d (status): %d", url,
proxy_worker->id, proxy_worker->s->status);
+
+ /* XXX: it may be already filled... */
strncpy(proxy_worker->s->route, node->mess.JVMRoute,
PROXY_WORKER_MAX_ROUTE_SIZ);
proxy_worker->s->route[PROXY_WORKER_MAX_ROUTE_SIZ] = '\0';
/* XXX: We need that information from TC */
@@ -221,6 +303,7 @@
/* Always return the SUCCESS */
return APR_SUCCESS;
}
+
/* Retrieve the parameter with the given name
* Something like 'JSESSIONID=12345...N'
* XXX: Should use the mod_proxy_balancer ones.
@@ -405,6 +488,8 @@
checking_standby = checked_standby = 0;
while (!mycandidate && !checked_standby) {
worker = (proxy_worker *)balancer->workers->elts;
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
+ "proxy: bytraffic for BALANCER (%d)",
balancer->workers->nelts);
for (i = 0; i < balancer->workers->nelts; i++, worker++) {
if (!checking_standby) { /* first time through */
if (worker->s->lbset > max_lbset)
@@ -422,6 +507,7 @@
*/
if (!PROXY_WORKER_IS_USABLE(worker))
ap_proxy_retry_worker("BALANCER", worker, r->server);
+
/* Take into calculation only the workers that are
* not in error state or not disabled.
*/
@@ -443,6 +529,205 @@
}
/*
+ * Check if the socket is still connected
+ * XXX: Should use the mod_proxy_balancer ones.
+ */
+#define USE_ALTERNATE_IS_CONNECTED 1
+
+#if !defined(APR_MSG_PEEK) && defined(MSG_PEEK)
+#define APR_MSG_PEEK MSG_PEEK
+#endif
+
+#if USE_ALTERNATE_IS_CONNECTED && defined(APR_MSG_PEEK)
+static int is_socket_connected(apr_socket_t *socket)
+{
+ apr_pollfd_t pfds[1];
+ apr_status_t status;
+ apr_int32_t nfds;
+
+ pfds[0].reqevents = APR_POLLIN;
+ pfds[0].desc_type = APR_POLL_SOCKET;
+ pfds[0].desc.s = socket;
+
+ do {
+ status = apr_poll(&pfds[0], 1, &nfds, 0);
+ } while (APR_STATUS_IS_EINTR(status));
+
+ if (status == APR_SUCCESS && nfds == 1 &&
+ pfds[0].rtnevents == APR_POLLIN) {
+ apr_sockaddr_t unused;
+ apr_size_t len = 1;
+ char buf[1];
+ /* The socket might be closed in which case
+ * the poll will return POLLIN.
+ * If there is no data available the socket
+ * is closed.
+ */
+ status = apr_socket_recvfrom(&unused, socket, APR_MSG_PEEK,
+ &buf[0], &len);
+ if (status == APR_SUCCESS && len)
+ return 1;
+ else
+ return 0;
+ }
+ else if (APR_STATUS_IS_EAGAIN(status) || APR_STATUS_IS_TIMEUP(status)) {
+ return 1;
+ }
+ return 0;
+
+}
+#else
+static int is_socket_connected(apr_socket_t *sock)
+
+{
+ apr_size_t buffer_len = 1;
+ char test_buffer[1];
+ apr_status_t socket_status;
+ apr_interval_time_t current_timeout;
+
+ /* save timeout */
+ apr_socket_timeout_get(sock, ¤t_timeout);
+ /* set no timeout */
+ apr_socket_timeout_set(sock, 0);
+ socket_status = apr_socket_recv(sock, test_buffer, &buffer_len);
+ /* put back old timeout */
+ apr_socket_timeout_set(sock, current_timeout);
+ if (APR_STATUS_IS_EOF(socket_status) ||
+ APR_STATUS_IS_ECONNRESET(socket_status))
+ return 0;
+ else
+ return 1;
+}
+#endif /* USE_ALTERNATE_IS_CONNECTED */
+/*
+ * Accept a connection from the backend server
+ */
+PROXY_DECLARE(int) toto_ap_proxy_accept_backend(const char *proxy_function,
+ proxy_conn_rec *conn,
+ proxy_worker *worker,
+ server_rec *s)
+{
+ apr_status_t rv;
+ int connected = 0;
+ int loglevel;
+ apr_sockaddr_t *backend_addr = conn->addr;
+ apr_socket_t *newsock;
+ void *sconf = s->module_config;
+ proxy_server_conf *conf =
+ (proxy_server_conf *) ap_get_module_config(sconf, &proxy_module);
+
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, s,
+ "ap_proxy_accept_backend");
+ if (conn->sock) {
+ /*
+ * This increases the connection pool size
+ * but the number of dropped connections is
+ * relatively small compared to connection lifetime
+ */
+ if (!(connected = is_socket_connected(conn->sock))) {
+ apr_socket_close(conn->sock);
+ conn->sock = NULL;
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, s,
+ "proxy: %s: backend socket is disconnected.",
+ proxy_function);
+ }
+ }
+
+ /* If we aren't connected we have to accept a connection */
+ while (!connected) {
+ if ((rv = apr_socket_accept(&newsock, worker->opaque,
+ conn->pool)) != APR_SUCCESS) {
+ loglevel = backend_addr->next ? APLOG_DEBUG : APLOG_ERR;
+ ap_log_error(APLOG_MARK, loglevel, rv, s,
+ "proxy: %s: error creating fam %d socket for target
%s",
+ proxy_function,
+ backend_addr->family,
+ worker->hostname);
+ /*
+ * XXX: We loop... But that is a just a test...
+ */
+ continue;
+ }
+
+#if !defined(TPF) && !defined(BEOS)
+ if (worker->recv_buffer_size > 0 &&
+ (rv = apr_socket_opt_set(newsock, APR_SO_RCVBUF,
+ worker->recv_buffer_size))) {
+ ap_log_error(APLOG_MARK, APLOG_ERR, rv, s,
+ "apr_socket_opt_set(SO_RCVBUF): Failed to set "
+ "ProxyReceiveBufferSize, using default");
+ }
+#endif
+
+ rv = apr_socket_opt_set(newsock, APR_TCP_NODELAY, 1);
+ if (rv != APR_SUCCESS && rv != APR_ENOTIMPL) {
+ ap_log_error(APLOG_MARK, APLOG_ERR, rv, s,
+ "apr_socket_opt_set(APR_TCP_NODELAY): "
+ "Failed to set");
+ }
+
+ /* Set a timeout on the socket */
+ if (worker->timeout_set == 1) {
+ apr_socket_timeout_set(newsock, worker->timeout);
+ }
+ else if (conf->timeout_set == 1) {
+ apr_socket_timeout_set(newsock, conf->timeout);
+ }
+ else {
+ apr_socket_timeout_set(newsock, s->timeout);
+ }
+ /* Set a keepalive option */
+ if (worker->keepalive) {
+ if ((rv = apr_socket_opt_set(newsock,
+ APR_SO_KEEPALIVE, 1)) != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_ERR, rv, s,
+ "apr_socket_opt_set(SO_KEEPALIVE): Failed to set"
+ " Keepalive");
+ }
+ }
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, s,
+ "proxy: %s: fam %d socket created to connect to %s",
+ proxy_function, backend_addr->family, worker->hostname);
+
+ /* if an error occurred, loop round and try again */
+ if (rv != APR_SUCCESS) {
+ apr_socket_close(newsock);
+ loglevel = backend_addr->next ? APLOG_DEBUG : APLOG_ERR;
+ ap_log_error(APLOG_MARK, loglevel, rv, s,
+ "proxy: %s: attempt to connect to %pI (%s) failed",
+ proxy_function,
+ backend_addr,
+ worker->hostname);
+ backend_addr = backend_addr->next;
+ continue;
+ }
+
+ conn->sock = newsock;
+ connected = 1;
+ }
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, s,
+ "ap_proxy_accept_backend: CONNECTED");
+ /*
+ * Put the entire worker to error state if
+ * the PROXY_WORKER_IGNORE_ERRORS flag is not set.
+ * Altrough some connections may be alive
+ * no further connections to the worker could be made
+ */
+ if (!connected && PROXY_WORKER_IS_USABLE(worker) &&
+ !(worker->s->status & PROXY_WORKER_IGNORE_ERRORS)) {
+ worker->s->status |= PROXY_WORKER_IN_ERROR;
+ worker->s->error_time = apr_time_now();
+ ap_log_error(APLOG_MARK, APLOG_ERR, 0, s,
+ "ap_proxy_connect_backend disabling worker for (%s)",
+ worker->hostname);
+ }
+ else {
+ worker->s->error_time = 0;
+ worker->s->retries = 0;
+ }
+ return connected ? OK : DECLINED;
+}
+/*
* Do a ping/pong to the node
*/
static apr_status_t proxy_cluster_try_pingpong(request_rec *r, proxy_worker *worker)
@@ -514,8 +799,14 @@
}
}
- /* Connect to the backend */
- rv = ap_proxy_connect_backend(scheme, conn, worker, r->server);
+ /* Connect to the backend: Check connected/reuse otherwise create new */
+ if (worker->opaque) {
+ /* The opaque points to the server_socket */
+ rv = ap_proxy_accept_backend(scheme, conn, worker, r->server);
+ } else {
+ /* Connect to the backend: Check connected/reuse otherwise create new */
+ rv = ap_proxy_connect_backend(scheme, conn, worker, r->server);
+ }
if (rv != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, r->server,
"proxy_cluster_try_pingpong: can't connect to
backend");
@@ -628,13 +919,18 @@
NULL
};
+/*
+ * Register the hooks on our module.
+ */
static void proxy_cluster_hooks(apr_pool_t *p)
{
ap_hook_post_config(proxy_cluster_post_config, NULL, NULL, APR_HOOK_MIDDLE);
/* create the provider for the proxy logic */
- ap_register_provider(p, PROXY_LBMETHOD, "cluster_bytraffic", "0",
&bytraffic);
+ ap_register_provider(p, PROXY_LBMETHOD, "cluster_bytraffic", "0",
&bytraffic);
+
}
+
static void *create_proxy_cluster_dir_config(apr_pool_t *p, char *dir)
{
return NULL;