(SOLVED)Re: How to properly handle decoder chain in netty?

Jiang Bian timobj at gmail.com
Thu Jun 4 08:07:23 EDT 2009


Frederic -
Thanks, but I thought ChannelPipelineCoverage("all") means the handler is
stateless or the member variable of the handler is designed to be shared.
Since I don't have any member variable in the handler, it's stateless, isn't
it?

Did I misunderstand the doc? 
http://www.jboss.org/file-access/default/members/netty/freezone/api/3.0/org/jboss/netty/channel/ChannelPipelineCoverage.html

Thanks again for your help!

Jiang



Frederic Bregier wrote:
> 
> Ok, my last note, it is just "decorative" but in case Netty does something
> with it later on,
> to be perfect, you should write:
> @ChannelPipelineCoverage("one")
> instead of
> @ChannelPipelineCoverage("all")
> since the decoder is used by "one" channel only at a time, contrary to
> other handler
> (like standard simpleChannelHandler") which could be shared among all
> channels
> (1 instance of handler for all channels = "all", 1 instance of handler for
> each channel = "one").
> 
> Again, right now it is only decorative in Netty (to let know the reader
> what is the purpose of the handler), so it does not change anything at
> runtime neither at compile time...
> 
> Glad that you fix your code!
> Cheers,
> 
> And thank again to Christian for the right info on TCP!!!
> 
> Frederic
> 
> 
> Jiang Bian wrote:
>> 
>> Thank you to both Frederic and Christian, the problem seems to be solved.
>> 
>> I merged the two decoder into one JigDFSCommandDecoder and extends
>> FrameDecoder.
>> 
>> And as Christian said, there shouldn't be any corruption using TCP,
>> especially I am doing the test through loopback. 
>> 
>> Here is my code, hope it helps someone else.
>> 
>> package org.jigdfs.simulation.protocol.codec;
>> 
>> import org.jboss.netty.buffer.ChannelBuffer;
>> import org.jboss.netty.channel.Channel;
>> import org.jboss.netty.channel.ChannelHandlerContext;
>> import org.jboss.netty.channel.ChannelPipelineCoverage;
>> import org.jboss.netty.handler.codec.frame.FrameDecoder;
>> import org.jigdfs.simulation.protocol.JigDFSCommand;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> 
>> @ChannelPipelineCoverage("all")
>> public class JigDFSCommandDecoder extends FrameDecoder {
>> 
>>     private final static Logger logger = LoggerFactory
>> 	    .getLogger(JigDFSCommandDecoder.class);
>> 
>>     protected Object decode(ChannelHandlerContext ctx,
>>             Channel channel,
>>             ChannelBuffer buf) throws Exception {
>> 	
>> 	if (!buf.readable()) {
>> 	    logger.debug("not readable...");
>> 	    return null;
>> 	}		
>> 
>> 	if (buf.readableBytes() < 8) {
>> 	    logger.debug("smaller than 8 bytes");
>> 	    return null;
>> 	}
>> 
>> 	buf.markReaderIndex();
>> 
>> 	int magicNumber = buf.readInt();
>> 	if (magicNumber != JigDFSCommand.JIGDFS_MAGIC_NUMBER) {
>> 	    logger.info("It's not a JigDFS message! " + buf.readerIndex()
>> 		    + "; readables: " + buf.readableBytes());
>> 
>> 	    buf.resetReaderIndex();
>> 	    return null;
>> 	}
>> 
>> 	int command = buf.readInt();
>> 
>> 	JigDFSCommand jigDFSCommand = new JigDFSCommand(command);
>> 	Object decodedObject = null;
>> 	switch(command){
>> 	case JigDFSCommand.CHUNK_DATA:	    
>> 	    decodedObject = new FileChunkDecoder().decode(buf);	    	    
>> 	    break;
>> 	case JigDFSCommand.SAVE_SLICE:	    
>> 	    decodedObject = new SaveSliceCommandDecoder().decode(buf);
>> 	    break;
>> 	default:
>> 	    decodedObject = buf;
>> 	    break;
>> 	}
>> 	
>> 	if(decodedObject == null){	    
>> 	    buf.resetReaderIndex();
>> 	    return null;
>> 	}
>> 
>> 	jigDFSCommand.setAttachment(decodedObject);	
>> 	
>> 	
>> 	return jigDFSCommand;
>> 
>>     }
>> 
>> }
>> 
>> Thanks,
>> 
>> Jiang 
>> 
>> 
>> 
>> Jiang Bian wrote:
>>> 
>>> Frederic -
>>> 
>>> Thank you for the answer.
>>> 
>>> I agree with you on the first point. I did tried to extend FrameCoder
>>> the very first time, I can't remember why, but it didn't work (i.e.
>>> probably some stupid reason). So, I looked the FrameCoder's source code
>>> and trying to mimic that, but I forgot the "cumulation" part (in
>>> FrameCoder, if it's not enough to decode, it puts in the cumulation
>>> ChannelBuffer). I will definitely go fix that.
>>> 
>>> I didn't merge two decoders into one, simply because I have a couple
>>> other decoders for different purpose. So, I feel like it's hard to
>>> maintain if I put everything in one decoder.
>>> 
>>> Also, I do have a PipelineFactory on the server side.
>>> 
>>> Thanks again,
>>> 
>>> Jiang
>>> 
>>> 
>>> 
>>> Frederic Bregier wrote:
>>>> 
>>>> Jiang,
>>>> 
>>>> From what I've read, here are some remarks.
>>>> Perhaps I did not see everything, but it is a start. I think it is
>>>> normal that you have such error
>>>> on transmission, see below why...
>>>> 
>>>> 1) JigDFSCommandDecoder should not be an extend of
>>>> SimpleChannelUpstreamHandler 
>>>> but probably be an extend of FrameDecoder or ReplayingDecoder (see API
>>>> for the difference).
>>>> The reason is that you probably want that the next decoder is only call
>>>> when you have a full
>>>> JigDFSCommand ready to be processed.
>>>> In your code, if you receive part of the chunk (let say the first 8
>>>> bytes), but no more, you will
>>>> continue into the next coder but without having the necessary data.
>>>> Moreover, if you have less
>>>> than 8 bytes for some reasons (TCP splits data, I don't remember how it
>>>> is called, but it is refered in
>>>> the API), you will just ignore this data and loose it (Netty will not
>>>> by default propose you again the
>>>> same bytes). So in both cases, you will loose data because you don't
>>>> store them.
>>>> 
>>>> Except if you use a FrameDecoder (or ReplayingDecoder) as first handler
>>>> since it will remember what
>>>> you have received until it is enough to be a full chunk. Remaining data
>>>> will be passed in the next
>>>> call.
>>>> Take a look at the example in chapter 1.7 of the Netty User Guide.
>>>> 
>>>> 2) The consequence of 1 is that probably your two coders will be put in
>>>> 1 since the second
>>>> decodes also some info from the channelBuffer.
>>>> For instance, you will have first to check the first Int as you do:
>>>> 
>>>> protected Object decode(ChannelHandlerContext ctx, Channel channel,
>>>> ChannelBuffer input)
>>>>            ...
>>>>             int magicNumber = input.readInt();
>>>> 	    if (magicNumber != JigDFSCommand.JIGDFS_MAGIC_NUMBER) {
>>>>                     logger.info("It's not a JigDFS message! "
>>>> 			+ input.readerIndex() + "; readables: " + input.readableBytes());
>>>> 		
>>>> 		//logger.debug("Hexdump: " +  ChannelBuffers.hexDump(input));		
>>>>                // here for instance make an exception in order to close
>>>> the channel or send a
>>>>                // message back to the client to inform that it is wrong
>>>> 		throw new MyException("Bad JigDFS, do something with it");
>>>>             }
>>>> 
>>>> Then to check the size according to the command you read too:
>>>>             int command = input.readInt();
>>>>             if(command != JigDFSCommand.CHUNK_DATA){
>>>> 	            return cmd; // or the object you want
>>>>         	}
>>>>            else // try to read the necessary info {
>>>>               if (input.readableBytes() < 24) {		
>>>> 		return null; // not enough data
>>>>             }
>>>> 
>>>> Now read the following parts...
>>>>             int segment = input.readInt();
>>>> 
>>>> 	// read the md5
>>>> 	byte[] md5 = new byte[16];
>>>> 	input.readBytes(md5);
>>>> 
>>>> 	// Read the length field.
>>>> 	int length = input.readInt();
>>>> 
>>>> 	// Make sure if there's enough bytes in the buffer.
>>>> 	if (input.readableBytes() < length) {
>>>> 		return null;
>>>> 	}
>>>>         // now you can read the data
>>>>         byte[] payload = new byte[length];
>>>> 	input.readBytes(payload);
>>>> 
>>>> 	Chunk chunk = new Chunk(payload, md5, segment);
>>>>         // and return the chunk
>>>>         return chunk;
>>>>        }
>>>>     }
>>>> 
>>>> 3) Also, this only works if you use a ChannelPipelineFactory on the
>>>> server side,
>>>> because each coder should be unique for each channel (due to the fact
>>>> that the
>>>> coder will remember the unused bytes from last call). This
>>>> ChannelPipelineFactory
>>>> should create a new coder handler for each connection.
>>>> So for correctness you should also write
>>>> @ChannelPipelineCoverage("one")
>>>> (it is only "decorative" right now in Netty, but it is intend to know
>>>> what the code is supposed to do).
>>>> 
>>>> 4) Then you just have to insert two handler, one for the coder (the two
>>>> coders in one handler),
>>>> and one for your business work (the one that use Chunk Object or
>>>> JigDFSCommand Object as argument).
>>>> 
>>>> HTH,
>>>> Frederic
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 

-- 
View this message in context: http://n2.nabble.com/How-to-properly-handle-decoder-chain-in-netty--tp3015408p3024355.html
Sent from the Netty User Group mailing list archive at Nabble.com.




More information about the netty-users mailing list