(SOLVED)Re: How to properly handle decoder chain in netty?

Jiang Bian timobj at gmail.com
Wed Jun 3 19:53:33 EDT 2009


Thank you to both Frederic and Christian, the problem seems to be solved.

I merged the two decoder into one JigDFSCommandDecoder and extends
FrameDecoder.

And as Christian said, there shouldn't be any corruption using TCP,
especially I am doing the test through loopback. 

Here is my code, hope it helps someone else.

package org.jigdfs.simulation.protocol.codec;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.jigdfs.simulation.protocol.JigDFSCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelPipelineCoverage("all")
public class JigDFSCommandDecoder extends FrameDecoder {

    private final static Logger logger = LoggerFactory
	    .getLogger(JigDFSCommandDecoder.class);

    protected Object decode(ChannelHandlerContext ctx,
            Channel channel,
            ChannelBuffer buf) throws Exception {
	
	if (!buf.readable()) {
	    logger.debug("not readable...");
	    return null;
	}		

	if (buf.readableBytes() < 8) {
	    logger.debug("smaller than 8 bytes");
	    return null;
	}

	buf.markReaderIndex();

	int magicNumber = buf.readInt();
	if (magicNumber != JigDFSCommand.JIGDFS_MAGIC_NUMBER) {
	    logger.info("It's not a JigDFS message! " + buf.readerIndex()
		    + "; readables: " + buf.readableBytes());

	    buf.resetReaderIndex();
	    return null;
	}

	int command = buf.readInt();

	JigDFSCommand jigDFSCommand = new JigDFSCommand(command);
	Object decodedObject = null;
	switch(command){
	case JigDFSCommand.CHUNK_DATA:	    
	    decodedObject = new FileChunkDecoder().decode(buf);	    	    
	    break;
	case JigDFSCommand.SAVE_SLICE:	    
	    decodedObject = new SaveSliceCommandDecoder().decode(buf);
	    break;
	default:
	    decodedObject = buf;
	    break;
	}
	
	if(decodedObject == null){	    
	    buf.resetReaderIndex();
	    return null;
	}

	jigDFSCommand.setAttachment(decodedObject);	
	
	
	return jigDFSCommand;

    }

}

Thanks,

Jiang 



Jiang Bian wrote:
> 
> Frederic -
> 
> Thank you for the answer.
> 
> I agree with you on the first point. I did tried to extend FrameCoder the
> very first time, I can't remember why, but it didn't work (i.e. probably
> some stupid reason). So, I looked the FrameCoder's source code and trying
> to mimic that, but I forgot the "cumulation" part (in FrameCoder, if it's
> not enough to decode, it puts in the cumulation ChannelBuffer). I will
> definitely go fix that.
> 
> I didn't merge two decoders into one, simply because I have a couple other
> decoders for different purpose. So, I feel like it's hard to maintain if I
> put everything in one decoder.
> 
> Also, I do have a PipelineFactory on the server side.
> 
> Thanks again,
> 
> Jiang
> 
> 
> 
> Frederic Bregier wrote:
>> 
>> Jiang,
>> 
>> From what I've read, here are some remarks.
>> Perhaps I did not see everything, but it is a start. I think it is normal
>> that you have such error
>> on transmission, see below why...
>> 
>> 1) JigDFSCommandDecoder should not be an extend of
>> SimpleChannelUpstreamHandler 
>> but probably be an extend of FrameDecoder or ReplayingDecoder (see API
>> for the difference).
>> The reason is that you probably want that the next decoder is only call
>> when you have a full
>> JigDFSCommand ready to be processed.
>> In your code, if you receive part of the chunk (let say the first 8
>> bytes), but no more, you will
>> continue into the next coder but without having the necessary data.
>> Moreover, if you have less
>> than 8 bytes for some reasons (TCP splits data, I don't remember how it
>> is called, but it is refered in
>> the API), you will just ignore this data and loose it (Netty will not by
>> default propose you again the
>> same bytes). So in both cases, you will loose data because you don't
>> store them.
>> 
>> Except if you use a FrameDecoder (or ReplayingDecoder) as first handler
>> since it will remember what
>> you have received until it is enough to be a full chunk. Remaining data
>> will be passed in the next
>> call.
>> Take a look at the example in chapter 1.7 of the Netty User Guide.
>> 
>> 2) The consequence of 1 is that probably your two coders will be put in 1
>> since the second
>> decodes also some info from the channelBuffer.
>> For instance, you will have first to check the first Int as you do:
>> 
>> protected Object decode(ChannelHandlerContext ctx, Channel channel,
>> ChannelBuffer input)
>>            ...
>>             int magicNumber = input.readInt();
>> 	    if (magicNumber != JigDFSCommand.JIGDFS_MAGIC_NUMBER) {
>>                     logger.info("It's not a JigDFS message! "
>> 			+ input.readerIndex() + "; readables: " + input.readableBytes());
>> 		
>> 		//logger.debug("Hexdump: " +  ChannelBuffers.hexDump(input));		
>>                // here for instance make an exception in order to close
>> the channel or send a
>>                // message back to the client to inform that it is wrong
>> 		throw new MyException("Bad JigDFS, do something with it");
>>             }
>> 
>> Then to check the size according to the command you read too:
>>             int command = input.readInt();
>>             if(command != JigDFSCommand.CHUNK_DATA){
>> 	            return cmd; // or the object you want
>>         	}
>>            else // try to read the necessary info {
>>               if (input.readableBytes() < 24) {		
>> 		return null; // not enough data
>>             }
>> 
>> Now read the following parts...
>>             int segment = input.readInt();
>> 
>> 	// read the md5
>> 	byte[] md5 = new byte[16];
>> 	input.readBytes(md5);
>> 
>> 	// Read the length field.
>> 	int length = input.readInt();
>> 
>> 	// Make sure if there's enough bytes in the buffer.
>> 	if (input.readableBytes() < length) {
>> 		return null;
>> 	}
>>         // now you can read the data
>>         byte[] payload = new byte[length];
>> 	input.readBytes(payload);
>> 
>> 	Chunk chunk = new Chunk(payload, md5, segment);
>>         // and return the chunk
>>         return chunk;
>>        }
>>     }
>> 
>> 3) Also, this only works if you use a ChannelPipelineFactory on the
>> server side,
>> because each coder should be unique for each channel (due to the fact
>> that the
>> coder will remember the unused bytes from last call). This
>> ChannelPipelineFactory
>> should create a new coder handler for each connection.
>> So for correctness you should also write @ChannelPipelineCoverage("one")
>> (it is only "decorative" right now in Netty, but it is intend to know
>> what the code is supposed to do).
>> 
>> 4) Then you just have to insert two handler, one for the coder (the two
>> coders in one handler),
>> and one for your business work (the one that use Chunk Object or
>> JigDFSCommand Object as argument).
>> 
>> HTH,
>> Frederic
>> 
>> 
> 
> 

-- 
View this message in context: http://n2.nabble.com/How-to-properly-handle-decoder-chain-in-netty--tp3015408p3021798.html
Sent from the Netty User Group mailing list archive at Nabble.com.




More information about the netty-users mailing list