A buffer-rewrite trap in NioWorker

伦甫钟 zhong.lunfu at gmail.com
Tue Aug 17 02:41:41 EDT 2010


Hi,

I found a buffer-rewrite trap in NioWorker.

Here is the code segment of NioWorker (version : 3.2.1.Final):

       ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);

        ...

        if (readBytes > 0) {
            bb.flip();

            final ChannelBufferFactory bufferFactory =
                channel.getConfig().getBufferFactory();
            final ChannelBuffer buffer = bufferFactory.getBuffer(
                    bb.order(bufferFactory.getDefaultOrder()));

            recvBufferPool.release(bb);

            // Update the predictor.
            predictor.previousReceiveBufferSize(readBytes);

            // Fire the event.
            fireMessageReceived(channel, buffer);
        } else {
            recvBufferPool.release(bb);
        }

if fireMessageReceived return immediately because of ExecutionHandler,
and bufferFactory is DirectChannelBufferFactory, recvBufferPool maybe
acquire the same bb(ByteBuffer) in next round.


Here is my code for showing the trap:

package cn.cafusic.netty.execution.bug;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.buffer.DirectChannelBufferFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.FixedReceiveBufferSizePredictor;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
import org.jboss.netty.handler.execution.ExecutionHandler;

/**
 * Trap
 *
 * @created 2010-8-17
 *
 */
public class Trap {

    private static class ClientHandler extends SimpleChannelHandler {
        @Override
        public void messageReceived(final ChannelHandlerContext ctx,
                                    final MessageEvent e) throws Exception {
            ChannelBuffer buf = (ChannelBuffer) e.getMessage();
            while (buf.readable()) {
                System.out.print((char) buf.readByte());
            }
            System.out.println();
            e.getChannel().close();
        }
    }

    private static class ServerHandler extends SimpleChannelHandler {
        @Override
        public void messageReceived(final ChannelHandlerContext ctx,
                                    final MessageEvent e) throws Exception {
            TimeUnit.SECONDS.sleep(1); // delay for buffer rewrite.
            e.getChannel().write(e.getMessage());
        }

        @Override
        public void channelConnected(ChannelHandlerContext ctx,
                                     ChannelStateEvent e) throws Exception {
            NioSocketChannelConfig config =
                (NioSocketChannelConfig) e.getChannel().getConfig();
            config.setBufferFactory(new DirectChannelBufferFactory());
// zero-copy
            config.setReceiveBufferSizePredictor(new
FixedReceiveBufferSizePredictor(10)); // fix buffer size requirement
        }

    }

    static void serve() {
        final ChannelFactory factory =
            new
NioServerSocketChannelFactory(Executors.newSingleThreadExecutor(),

Executors.newSingleThreadExecutor(),
                                              1);
        final ServerBootstrap bootstrap = new ServerBootstrap(factory);

        ChannelPipeline pipeline = bootstrap.getPipeline();
        pipeline.addLast("execution",
                         new
ExecutionHandler(Executors.newCachedThreadPool())); // async message
received
        pipeline.addLast("handler", new ServerHandler());

        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);
        final Channel bind = bootstrap.bind(new InetSocketAddress(8080));

        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                System.out.println("shutdown");
                bind.close().awaitUninterruptibly();
                bootstrap.releaseExternalResources();
            }
        });
    }

    static void connect() {
        final ChannelFactory factory =
            new
NioClientSocketChannelFactory(Executors.newSingleThreadExecutor(),

Executors.newSingleThreadExecutor(),
                                              1);

        final ClientBootstrap bootstrap = new ClientBootstrap(factory);

        ChannelPipeline pipeline = bootstrap.getPipeline();
        pipeline.addLast("handler", new ClientHandler());

        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);

        ChannelFuture future =
            bootstrap.connect(new InetSocketAddress("localhost", 8080));

        Channel channel = future.awaitUninterruptibly().getChannel();

        channel.write(ChannelBuffers.wrappedBuffer("++++++++++".getBytes()))
               .awaitUninterruptibly();
        channel.write(ChannelBuffers.wrappedBuffer("----------".getBytes()))
               .awaitUninterruptibly();
        channel.write(ChannelBuffers.wrappedBuffer("==========".getBytes()))
               .awaitUninterruptibly();

        channel.getCloseFuture().awaitUninterruptibly();
        bootstrap.releaseExternalResources();
    }

    public static void main(String[] args) {
        serve();
        connect();
        System.exit(0);
    }
}

Is threre any improvements can avoid this trap in lastest version ?

Thanks,

Zhongl


More information about the netty-users mailing list