ClosedChannelException with concurrent connections - what am I doing wrong?

Nate Murray nate at natemurray.com
Tue Aug 10 13:31:09 EDT 2010


Hey Friends,

I'm trying to build the most basic webserver using Netty. I'm not able to
make more than 300 concurrent connections without it throwing exceptions and
closing connections.

My long term goal is to have an asynchronous server that will take an
incoming request, perform a job that will take 3-5 seconds and then handle
the next request. This is why I am using
OrderedMemoryAwareThreadPoolExecutor.


Here is the stack trace I'm getting:

    java.nio.channels.ClosedChannelException
    message received
        at
org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:646)
        at
org.jboss.netty.channel.socket.nio.NioWorker.writeFromUserCode(NioWorker.java:367)
        at
org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:137)
        at
org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:76)
        at org.jboss.netty.channel.Channels.write(Channels.java:632)
        at
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70)
        at
org.jboss.netty.handler.execution.ExecutionHandler.handleDownstream(ExecutionHandler.java:167)
        at org.jboss.netty.channel.Channels.write(Channels.java:611)
        at org.jboss.netty.channel.Channels.write(Channels.java:578)
        at
org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:259)
        at clobie.HttpRequestHandler.writeResponse(Unknown Source)
        at clobie.HttpRequestHandler.messageReceived(Unknown Source)
        at
org.jboss.netty.handler.execution.ChannelEventRunnable.run(ChannelEventRunnable.java:69)
        at
org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor$ChildExecutor.run(Ordermessage
received
edMemoryAwareThreadPoolExecutor.java:316)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:637)



and here is my server code:


package sample;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map.Entry;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.Cookie;
import org.jboss.netty.handler.codec.http.CookieDecoder;
import org.jboss.netty.handler.codec.http.CookieEncoder;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpChunkTrailer;
import org.jboss.netty.handler.codec.http.HttpContentCompressor;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.jboss.netty.handler.execution.ExecutionHandler;
import
org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import org.jboss.netty.util.CharsetUtil;
import static org.jboss.netty.channel.Channels.*;
import static org.jboss.netty.handler.codec.http.HttpHeaders.*;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
import static org.jboss.netty.handler.codec.http.HttpVersion.*;

public class TestHttpServer {
    public static void main(String[] args) {
        System.out.println("starting webserver");
        // Configure the server.
        ServerBootstrap bootstrap = new ServerBootstrap(
                new NioServerSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool()));

        // Set up the event pipeline factory.
        bootstrap.setPipelineFactory(new HttpServerPipelineFactory());

        // Bind and start to accept incoming connections.
        bootstrap.bind(new InetSocketAddress(3335));
    }
}

class HttpServerPipelineFactory implements ChannelPipelineFactory {
    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = pipeline();
        pipeline.addLast("decoder", new HttpRequestDecoder());
        pipeline.addLast("encoder", new HttpResponseEncoder());
        pipeline.addLast("pipelineExecutor", 
                         new ExecutionHandler(new
OrderedMemoryAwareThreadPoolExecutor(1000, 0, 0)));
        pipeline.addLast("handler", new HttpRequestHandler());
        return pipeline;
    }
}

class HttpRequestHandler extends SimpleChannelUpstreamHandler {
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
        System.out.println("message received");
        StringBuilder buf = new StringBuilder();
        buf.setLength(0);
        buf.append("hello\r\n");
        buf.append("\r\n");

        writeResponse(e, buf);
    }

    private void writeResponse(MessageEvent e, StringBuilder buf) {
        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
        ChannelBuffer content = ChannelBuffers.copiedBuffer(buf.toString(),
CharsetUtil.UTF_8);
        response.setContent(content);
        response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
        response.setHeader(CONTENT_LENGTH, String.format("%d",
content.readableBytes()));

        ChannelFuture future = e.getChannel().write(response);
        future.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
            throws Exception {
        e.getCause().printStackTrace();
        System.out.println("Got an Exception: " +
e.getCause().getMessage());
        e.getChannel().close();
    }
}


compile then run with:


 java -cp target/classes:$(cat .classpath) -Xmx2g sample.TestHttpServer


then test:


$ ab  -n 500 -c 500 http://127.0.0.1:3335/

Benchmarking 127.0.0.1 (be patient)

Test aborted after 10 failures

apr_socket_connect(): Connection refused (61)


Given that I've read reports of Netty solving the c10k problem, it seems to
me that 500 concurrent connections is fairly small.

What am I doing wrong here? What can I change to handle more concurrent
connections?

Thanks for your help,

Nate
-- 
View this message in context: http://netty-forums-and-mailing-lists.685743.n2.nabble.com/ClosedChannelException-with-concurrent-connections-what-am-I-doing-wrong-tp5394059p5394059.html
Sent from the Netty User Group mailing list archive at Nabble.com.


More information about the netty-users mailing list