garbled data passed in cumulation buffer for decode(), probably when data re-transmission.

huican ping pinghuican at gmail.com
Sat Sep 12 23:54:42 EDT 2009


Hello Trustin,

I made a simple standalone server with a slightly modified decoder
from IntegerHeaderFrameDecoder at FrameDecoder class page.

I tried those cases:
For persistent connections, my client code (using c) can run about 86K
msgs/sec at CPU usage of 85% with 1066 bytes input..
For non-persistent connections, it can run about 17K mesg/sec at CPU
usage of 69%.

The number is really really amazing (thanks to netty's really high
performance code).

But I also noticed that exceptions threw out from time to time for the
non-persistent connection, the stack is below.
===========
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcher.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:233)
        at sun.nio.ch.IOUtil.read(IOUtil.java:206)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236)
        at org.jboss.netty.buffer.HeapChannelBuffer.setBytes(HeapChannelBuffer.java:163)
        at org.jboss.netty.buffer.AbstractChannelBuffer.writeBytes(AbstractChannelBuffer.java:432)
        at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:312)
        at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:203)
        at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:53)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
        at java.lang.Thread.run(Thread.java:619)

==========
1:) As I read the client code, that our client (written by c) seems
dropping connection after got the response back.
2:) Also when the server threw that exception, the client has no
failed transactions. I think it got the response without issue.

Do you have any idea (or pointers) for this exception for furthur study?

Thanks
Huican


On Sat, Sep 12, 2009 at 12:41 AM, huican ping <pinghuican at gmail.com> wrote:
> Hello Trustin,
>
> I have been looking at that garbled channel buffer for my FrameDecoder
> case in these days.
> FYI, my dummy frameDecoder for comming message with structure (for
> example): StartByte+Any-Bytes-Different-Than-StartByte-Trailer+TrailerByte.
>
> So far, I saw both the two failed cases for my 1066 bytes (results in
> 2 messageReceived calls).
>  a) good-1024-bytes + bad data.   // Since I am waiting on the tailer
> byte, so it receiveTimeout.
>  b) bad-1024-bytes + good data.   // Since I found the trailer byte,
> but there is no good startByte, so it is bad message.
>
> 1:) I captured and tcpdumped one failed case inside
> million-and-million successful oens, and noticed that the client sent
> in a good message to the server based on the FrameDecoder.
> 2:) and when I read the code at: FrameDecoder.java, method messageReceived().
> Since the client can sends in a message which results in multiple
> messageReceived Call (such as 2 for my input), I just wonder how the
> code below to keep synchronizing the cumulation buffer for these if
> and else cases. What I am worrying is that when the previous
> messageReceived event at the else branch (not done yet), another
> messageReceived event can go into the if branch.
>
> Is this a problem?
>
> Thanks
> Huican Ping
>
> ===============
>    public void messageReceived(
>            ChannelHandlerContext ctx, MessageEvent e) throws Exception {
>
>        Object m = e.getMessage();
>        if (!(m instanceof ChannelBuffer)) {
>            ctx.sendUpstream(e);
>            return;
>        }
>
>        ChannelBuffer input = (ChannelBuffer) m;
>        if (!input.readable()) {
>            return;
>        }
>
>        ChannelBuffer cumulation = cumulation(ctx);
>        if (cumulation.readable()) {
>            cumulation.discardReadBytes();
>            cumulation.writeBytes(input);
>            callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
>        } else {
>            callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
>            if (input.readable()) {
>                cumulation.writeBytes(input);
>            }
>        }
>    }
> ==========================
>
>
>
> On Thu, Sep 10, 2009 at 10:17 PM, huican ping <pinghuican at gmail.com> wrote:
>> Hello Trustin,
>>
>> I thought I found some place in my parser suspicious, now my logic is
>> pretty much same as the example posted at the FrameDecoder page, and
>> still get the same issue.
>>
>> // My decode() logic.
>> protected Object decode(ChannelHandlerContext ctx, Channel channel,
>>                        ChannelBuffer buffer) throws Exception {
>>
>>        // return if the buffer empty
>>        if (buffer.readableBytes() <= 0) {
>>                return null;
>>        }
>>
>>        // Mark the current buffer position before parsing the buffer
>>        // because the whole message might not be in the buffer yet.
>>        // We will reset the buffer position to the marked position in
>>        // that case.
>>        buffer.markReaderIndex();
>>
>>
>>        for ( loc = buffer.readerIndex(); loc < buffer.writerIndex(); loc++)
>>        {
>>                // Paser code.
>>                // if find the whole message, then return that message
>>                // otherwise, it will hit the end of the loop
>>                // Inside the loop
>>
>>                // I didn't modify the buffer except change its readerIndex if
>>                // the whole message is not there yet.
>>        }
>>
>>
>>        // The whole message were not received yet - return null.
>>        // This method will be invoked again when more packets are
>>        // received and appended to the buffer.
>>
>>        // Reset to the marked position to read the length field again
>>        // next time.
>>        buffer.resetReaderIndex();
>>        return null;
>> }
>>
>> // my getPipeline() logic:
>>                pipeline.addLast("decoder", new acmeDecoder());
>>                pipeline.addLast("encoder", new acmeEncoder());
>>                pipeline.addLast("handler", new acmeServerHandler());



More information about the netty-users mailing list