A buffer-rewrite trap in NioWorker

"이희승 (Trustin Lee)" trustin at gmail.com
Thu Aug 26 02:07:35 EDT 2010


Nice finding.  I've just checked in the fix.  Netty now always perform
copy before returning the read buffer to the pool.

On 08/17/2010 03:41 PM, 伦甫钟 wrote:
> Hi,
> 
> I found a buffer-rewrite trap in NioWorker.
> 
> Here is the code segment of NioWorker (version : 3.2.1.Final):
> 
>        ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
> 
>         ...
> 
>         if (readBytes > 0) {
>             bb.flip();
> 
>             final ChannelBufferFactory bufferFactory =
>                 channel.getConfig().getBufferFactory();
>             final ChannelBuffer buffer = bufferFactory.getBuffer(
>                     bb.order(bufferFactory.getDefaultOrder()));
> 
>             recvBufferPool.release(bb);
> 
>             // Update the predictor.
>             predictor.previousReceiveBufferSize(readBytes);
> 
>             // Fire the event.
>             fireMessageReceived(channel, buffer);
>         } else {
>             recvBufferPool.release(bb);
>         }
> 
> if fireMessageReceived return immediately because of ExecutionHandler,
> and bufferFactory is DirectChannelBufferFactory, recvBufferPool maybe
> acquire the same bb(ByteBuffer) in next round.
> 
> 
> Here is my code for showing the trap:
> 
> package cn.cafusic.netty.execution.bug;
> 
> import java.net.InetSocketAddress;
> import java.util.concurrent.Executors;
> import java.util.concurrent.TimeUnit;
> 
> import org.jboss.netty.bootstrap.ClientBootstrap;
> import org.jboss.netty.bootstrap.ServerBootstrap;
> import org.jboss.netty.buffer.ChannelBuffer;
> import org.jboss.netty.buffer.ChannelBuffers;
> import org.jboss.netty.buffer.DirectChannelBufferFactory;
> import org.jboss.netty.channel.Channel;
> import org.jboss.netty.channel.ChannelFactory;
> import org.jboss.netty.channel.ChannelFuture;
> import org.jboss.netty.channel.ChannelHandlerContext;
> import org.jboss.netty.channel.ChannelPipeline;
> import org.jboss.netty.channel.ChannelStateEvent;
> import org.jboss.netty.channel.FixedReceiveBufferSizePredictor;
> import org.jboss.netty.channel.MessageEvent;
> import org.jboss.netty.channel.SimpleChannelHandler;
> import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
> import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
> import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
> import org.jboss.netty.handler.execution.ExecutionHandler;
> 
> /**
>  * Trap
>  *
>  * @created 2010-8-17
>  *
>  */
> public class Trap {
> 
>     private static class ClientHandler extends SimpleChannelHandler {
>         @Override
>         public void messageReceived(final ChannelHandlerContext ctx,
>                                     final MessageEvent e) throws Exception {
>             ChannelBuffer buf = (ChannelBuffer) e.getMessage();
>             while (buf.readable()) {
>                 System.out.print((char) buf.readByte());
>             }
>             System.out.println();
>             e.getChannel().close();
>         }
>     }
> 
>     private static class ServerHandler extends SimpleChannelHandler {
>         @Override
>         public void messageReceived(final ChannelHandlerContext ctx,
>                                     final MessageEvent e) throws Exception {
>             TimeUnit.SECONDS.sleep(1); // delay for buffer rewrite.
>             e.getChannel().write(e.getMessage());
>         }
> 
>         @Override
>         public void channelConnected(ChannelHandlerContext ctx,
>                                      ChannelStateEvent e) throws Exception {
>             NioSocketChannelConfig config =
>                 (NioSocketChannelConfig) e.getChannel().getConfig();
>             config.setBufferFactory(new DirectChannelBufferFactory());
> // zero-copy
>             config.setReceiveBufferSizePredictor(new
> FixedReceiveBufferSizePredictor(10)); // fix buffer size requirement
>         }
> 
>     }
> 
>     static void serve() {
>         final ChannelFactory factory =
>             new
> NioServerSocketChannelFactory(Executors.newSingleThreadExecutor(),
> 
> Executors.newSingleThreadExecutor(),
>                                               1);
>         final ServerBootstrap bootstrap = new ServerBootstrap(factory);
> 
>         ChannelPipeline pipeline = bootstrap.getPipeline();
>         pipeline.addLast("execution",
>                          new
> ExecutionHandler(Executors.newCachedThreadPool())); // async message
> received
>         pipeline.addLast("handler", new ServerHandler());
> 
>         bootstrap.setOption("child.tcpNoDelay", true);
>         bootstrap.setOption("child.keepAlive", true);
>         final Channel bind = bootstrap.bind(new InetSocketAddress(8080));
> 
>         Runtime.getRuntime().addShutdownHook(new Thread() {
>             @Override
>             public void run() {
>                 System.out.println("shutdown");
>                 bind.close().awaitUninterruptibly();
>                 bootstrap.releaseExternalResources();
>             }
>         });
>     }
> 
>     static void connect() {
>         final ChannelFactory factory =
>             new
> NioClientSocketChannelFactory(Executors.newSingleThreadExecutor(),
> 
> Executors.newSingleThreadExecutor(),
>                                               1);
> 
>         final ClientBootstrap bootstrap = new ClientBootstrap(factory);
> 
>         ChannelPipeline pipeline = bootstrap.getPipeline();
>         pipeline.addLast("handler", new ClientHandler());
> 
>         bootstrap.setOption("child.tcpNoDelay", true);
>         bootstrap.setOption("child.keepAlive", true);
> 
>         ChannelFuture future =
>             bootstrap.connect(new InetSocketAddress("localhost", 8080));
> 
>         Channel channel = future.awaitUninterruptibly().getChannel();
> 
>         channel.write(ChannelBuffers.wrappedBuffer("++++++++++".getBytes()))
>                .awaitUninterruptibly();
>         channel.write(ChannelBuffers.wrappedBuffer("----------".getBytes()))
>                .awaitUninterruptibly();
>         channel.write(ChannelBuffers.wrappedBuffer("==========".getBytes()))
>                .awaitUninterruptibly();
> 
>         channel.getCloseFuture().awaitUninterruptibly();
>         bootstrap.releaseExternalResources();
>     }
> 
>     public static void main(String[] args) {
>         serve();
>         connect();
>         System.exit(0);
>     }
> }
> 
> Is threre any improvements can avoid this trap in lastest version ?
> 
> Thanks,
> 
> Zhongl
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users

-- 
what we call human nature in actuality is human habit
http://gleamynode.net/

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 293 bytes
Desc: OpenPGP digital signature
Url : http://lists.jboss.org/pipermail/netty-users/attachments/20100826/198fc0e8/attachment-0001.bin 


More information about the netty-users mailing list