garbled data passed in cumulation buffer for decode(), probably when data re-transmission.
huican ping
pinghuican at gmail.com
Sat Sep 12 23:54:42 EDT 2009
Hello Trustin,
I made a simple standalone server with a slightly modified decoder
from IntegerHeaderFrameDecoder at FrameDecoder class page.
I tried those cases:
For persistent connections, my client code (using c) can run about 86K
msgs/sec at CPU usage of 85% with 1066 bytes input..
For non-persistent connections, it can run about 17K mesg/sec at CPU
usage of 69%.
The number is really really amazing (thanks to netty's really high
performance code).
But I also noticed that exceptions threw out from time to time for the
non-persistent connection, the stack is below.
===========
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:233)
at sun.nio.ch.IOUtil.read(IOUtil.java:206)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236)
at org.jboss.netty.buffer.HeapChannelBuffer.setBytes(HeapChannelBuffer.java:163)
at org.jboss.netty.buffer.AbstractChannelBuffer.writeBytes(AbstractChannelBuffer.java:432)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:312)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:203)
at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:53)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:619)
==========
1:) As I read the client code, that our client (written by c) seems
dropping connection after got the response back.
2:) Also when the server threw that exception, the client has no
failed transactions. I think it got the response without issue.
Do you have any idea (or pointers) for this exception for furthur study?
Thanks
Huican
On Sat, Sep 12, 2009 at 12:41 AM, huican ping <pinghuican at gmail.com> wrote:
> Hello Trustin,
>
> I have been looking at that garbled channel buffer for my FrameDecoder
> case in these days.
> FYI, my dummy frameDecoder for comming message with structure (for
> example): StartByte+Any-Bytes-Different-Than-StartByte-Trailer+TrailerByte.
>
> So far, I saw both the two failed cases for my 1066 bytes (results in
> 2 messageReceived calls).
> a) good-1024-bytes + bad data. // Since I am waiting on the tailer
> byte, so it receiveTimeout.
> b) bad-1024-bytes + good data. // Since I found the trailer byte,
> but there is no good startByte, so it is bad message.
>
> 1:) I captured and tcpdumped one failed case inside
> million-and-million successful oens, and noticed that the client sent
> in a good message to the server based on the FrameDecoder.
> 2:) and when I read the code at: FrameDecoder.java, method messageReceived().
> Since the client can sends in a message which results in multiple
> messageReceived Call (such as 2 for my input), I just wonder how the
> code below to keep synchronizing the cumulation buffer for these if
> and else cases. What I am worrying is that when the previous
> messageReceived event at the else branch (not done yet), another
> messageReceived event can go into the if branch.
>
> Is this a problem?
>
> Thanks
> Huican Ping
>
> ===============
> public void messageReceived(
> ChannelHandlerContext ctx, MessageEvent e) throws Exception {
>
> Object m = e.getMessage();
> if (!(m instanceof ChannelBuffer)) {
> ctx.sendUpstream(e);
> return;
> }
>
> ChannelBuffer input = (ChannelBuffer) m;
> if (!input.readable()) {
> return;
> }
>
> ChannelBuffer cumulation = cumulation(ctx);
> if (cumulation.readable()) {
> cumulation.discardReadBytes();
> cumulation.writeBytes(input);
> callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
> } else {
> callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
> if (input.readable()) {
> cumulation.writeBytes(input);
> }
> }
> }
> ==========================
>
>
>
> On Thu, Sep 10, 2009 at 10:17 PM, huican ping <pinghuican at gmail.com> wrote:
>> Hello Trustin,
>>
>> I thought I found some place in my parser suspicious, now my logic is
>> pretty much same as the example posted at the FrameDecoder page, and
>> still get the same issue.
>>
>> // My decode() logic.
>> protected Object decode(ChannelHandlerContext ctx, Channel channel,
>> ChannelBuffer buffer) throws Exception {
>>
>> // return if the buffer empty
>> if (buffer.readableBytes() <= 0) {
>> return null;
>> }
>>
>> // Mark the current buffer position before parsing the buffer
>> // because the whole message might not be in the buffer yet.
>> // We will reset the buffer position to the marked position in
>> // that case.
>> buffer.markReaderIndex();
>>
>>
>> for ( loc = buffer.readerIndex(); loc < buffer.writerIndex(); loc++)
>> {
>> // Paser code.
>> // if find the whole message, then return that message
>> // otherwise, it will hit the end of the loop
>> // Inside the loop
>>
>> // I didn't modify the buffer except change its readerIndex if
>> // the whole message is not there yet.
>> }
>>
>>
>> // The whole message were not received yet - return null.
>> // This method will be invoked again when more packets are
>> // received and appended to the buffer.
>>
>> // Reset to the marked position to read the length field again
>> // next time.
>> buffer.resetReaderIndex();
>> return null;
>> }
>>
>> // my getPipeline() logic:
>> pipeline.addLast("decoder", new acmeDecoder());
>> pipeline.addLast("encoder", new acmeEncoder());
>> pipeline.addLast("handler", new acmeServerHandler());
More information about the netty-users
mailing list