Code example to write asynchronous server

yanky young yanky.young at gmail.com
Thu Apr 16 01:47:12 EDT 2009


Hi:

I think your understanding about FrameEncoder is quite right. I am not sure
why your channel buffer capacity is 1024. Yes, ChannelBuffer passed from
decode method is assigned by netty framework, but framework would not set
limit on this. framework will create ChannelBuffer with the size of the
reveiced message as needed.

As for client programming language, that's not the issue, as long as you
send the message with consistent format as the server.

In my test code, I use this kind of pipeline:

        ChannelPipeline pipeline = pipeline();
        pipeline.addLast("framer", new LengthFieldBasedFrameDecoder(20480,
0, 4, 0 ,4));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("lengthHeaderEncoder", new
LengthFieldPrepender(4));
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", myAppHandler);

this pipeline can automatically add a header to your message sent, and
automatically strip length header field when received. please note that
length field length is the same as the initial bytes to strip size here.

Of course you can do all of these things in your own FrameDecoder, but
separation of concerns is better.

yanky


2009/4/15 manishr22 <manishr22 at gmail.com>

>
> Hi yanky,
>
> The problem remains the same. I see my ChannelBuffer capacity is always
> 1024 though i am writing bigger data from client.So that again I see the
> same problem as before.
>
> Could you please help me in understanding the flow of decode() method?
> Though i read it but if you explain that may correct my understanding.
>
> As i understood, whenever buffer.resetReaderIndex(); encountered, decode()
> method get called again, this means my message-length and header-length
> variable will again re-initialize with the first 4-4 bytes respectively of
> the current buffer readIndex position. which should never happen. once i red
> the header and message-length my application should first read that message
> and then only go for next message reading operation.
>
> say my client is sending data like
>
> [MESSAGE 1][4 byte message length][4 bytes header length][header][body]
> [MESSAGE 2][4 byte message length][4 bytes header length][header][body]
> [MESSAGE 3] [4 byte message length][4 bytes header
> length][header][body].......
>
> Since multiple clients will se sending data in same fashion so my Netty
> server should understand them all.
>
> Note: Right now my test client is not written using netty api and simply
> writing message on socket, will that make any difference?
>
> I am really sorry to bug more, since i am learning Netty, so i have so many
> questions.
>
> Regards
> Manish
>
>
>
>
> Hi:
>
> Maybe you can have a look at ChannelBuffer interface document. I am not
> sure
> I have got it right. But I am trying to correct your code according to my
> understanding. Some resetReaderIndex methods have been commented out for
> correction. You can try it. Please let me know if it works.
>
> good luck
>
> yanky
>
>
> public class LoggingAgentMessageDecoder extends FrameDecoder {
>
>       private int maxMsgLenToRead = Constants.LOG_AGENT_MAXIMUM_MSG_LEN;
>
>       private Logger logger = Logger.getLogger(this.getClass());
>
>       @Override
>        protected Object decode(ChannelHandlerContext ctx, Channel channel,
>                        ChannelBuffer buffer) throws Exception {
>
>               ChannelBuffer[] frame = new ChannelBuffer[2];
>
>               // Make sure if the body length field was received.
>               if (buffer.readableBytes() < 4) {
>                       return null;
>               }
>               // The length field is in the buffer.
>
>               // Mark the current buffer position before reading the length
> field
>               buffer.markReaderIndex();
>               // Read the length field.
>               int messageLength = buffer.readInt();
>               logger.info("Total message lenth is " + messageLength);
>
>               // Make sure if the message length field was received.
>               if (buffer.readableBytes() < 4) {
>                       // if not enough bytes, we should reset reader index
> to ensure we can read header length field again at the next time when this
> decode method is invoked
>                       buffer.resetReaderIndex();
>                       return null;
>               }
>               // buffer.markReaderIndex(); // not needed here
>               int headerLength = buffer.readInt();
>               logger.info("Total message lenth is " + headerLength);
>
>               // buffer.resetReaderIndex(); // not needed here
>               if (buffer.readableBytes() < headerLength) {
>                       // The whole bytes were not received yet - return
> null.
>                       // This method will be invoked again when more
> packets are
>                       // received and appended to the buffer.
>
>                       // Reset to the marked position to read the length
> field again
>                       // next time.
>                       buffer.resetReaderIndex(); // yes, this time we need
> to reset reader index to read all bytes again
>
>                       return null;
>               }
>               frame[0] = buffer.readBytes(headerLength);
>               // buffer.resetReaderIndex();  // not needed here
>               System.out.println("Bufer capacity is " + buffer.capacity());
>               if (buffer.readableBytes() < messageLength) {
>                       // The whole bytes were not received yet - return
> null.
>                       // This method will be invoked again when more
> packets are
>                       // received and appended to the buffer.
>
>                       // Reset to the marked position to read the length
> field again
>                       // next time.
>
>                       // control is coming inside this since the capacity
> fo channelbuffer
>                       // is always less then the total message length.
>                       buffer.resetReaderIndex(); // yes, again we need to
> reset
>
>                       return null;
>               }
>
>               // There's enough bytes in the buffer. Read it.
>               frame[1] = buffer.readBytes(messageLength);
>               // TODO: need to add the logic to get the data parsed and add
> into
>               // activeAMQ
>               return frame;
>       }
> }
>
>
> 2009/4/14 manishr22 <manishr22 at gmail.com>
>
> >
> > Hi,
> >
> > Still this one is not able to solve this problem.
> >
> > The message comes from my server has proper format like:
> >
> > First 4 byte has information about body length. for example 1173
> > Next 4 byte has length of header for example 92
> > now in next 92 bytes i have my message header.
> > and then from 100th byte onward i have my message body till 1173 byte.
> >
> > To solve this I have written my decoder and tried solving it like:
> >
> > public class LoggingAgentMessageDecoder extends FrameDecoder {
> >
> >        private int maxMsgLenToRead = Constants.LOG_AGENT_MAXIMUM_MSG_LEN;
> >
> >        private Logger logger = Logger.getLogger(this.getClass());
> >
> >        @Override
> >         protected Object decode(ChannelHandlerContext ctx, Channel
> channel,
> >                         ChannelBuffer buffer) throws Exception {
> >
> >                ChannelBuffer[] frame = new ChannelBuffer[2];
> >
> >                // Make sure if the message length field was received.
> >                if (buffer.readableBytes() < 4) {
> >                        return null;
> >                }
> >                // The length field is in the buffer.
> >
> >                // Mark the current buffer position before reading the
> > length field
> >                buffer.markReaderIndex();
> >                // Read the length field.
> >                int messageLength = buffer.readInt();
> >                logger.info("Total message lenth is " + messageLength);
> >
> >                // Make sure if the message length field was received.
> >                if (buffer.readableBytes() < 4) {
> >                        return null;
> >                }
> >                buffer.markReaderIndex();
> >                int headerLength = buffer.readInt();
> >                logger.info("Total message lenth is " + headerLength);
> >
> >                buffer.resetReaderIndex();
> >                if (buffer.readableBytes() < headerLength) {
> >                        // The whole bytes were not received yet - return
> > null.
> >                        // This method will be invoked again when more
> > packets are
> >                        // received and appended to the buffer.
> >
> >                        // Reset to the marked position to read the length
> > field again
> >                        // next time.
> >                        buffer.resetReaderIndex();
> >
> >                        return null;
> >                }
> >                frame[0] = buffer.readBytes(headerLength);
> >                buffer.resetReaderIndex();
> >                System.out.println("Bufer capacity is " +
> > buffer.capacity());
> >                if (buffer.readableBytes() < messageLength) {
> >                        // The whole bytes were not received yet - return
> > null.
> >                        // This method will be invoked again when more
> > packets are
> >                        // received and appended to the buffer.
> >
> >                        // Reset to the marked position to read the length
> > field again
> >                        // next time.
> >
> >                        // control is coming inside this since the
> capacity
> > fo channelbuffer
> >                        // is always less then the total message length.
> >                        buffer.resetReaderIndex();
> >
> >                        return null;
> >                }
> >
> >                // There's enough bytes in the buffer. Read it.
> >                frame[1] = buffer.readBytes(messageLength);
> >                // TODO: need to add the logic to get the data parsed and
> > add into
> >                // activeAMQ
> >                return frame;
> >        }
> > }
> >
> >
> > But the problem in this i see is whenever the controls come to if
> statement
> > which goes to read my body i see buffer.readableBytes() is always 1020
> and
> > since my body length is greater than this it goes inside the if condition
> > and call the method buffer.resetReaderIndex().
> >
> > This actually recall the whole decode method again and then re-initialize
> > the value for message-length and header-length to the wrong value like
> > message length as 92 and after that 262145.
> >
> > and this all mess up completely. for my testing from client i am actually
> > sending only one data which message header of 92 byte and body as 1173
> > bytes.
> >
> > so i am not sure how come in different iteration the message length gets
> > increased.
> >
> > i see always the capacity the the ChannelBuffer comes as 1024 though i am
> > not setting the limit to channelBuffer.
> >
> >
> > Please suggest what wrong i am doing?
> >
> > Regards
> > Manish
> >
> >
> > Hi:
> >
> > You have a schema in your message. So while encoding and decoding, you
> have
> > to recognize a complete request or response message. A common practice is
> > to
> > use another delimiter that's usually not used in protocol message to mark
> > the boundary of different protocol messages. Fortunately, netty do has
> > out-of-box support for this problem. You can have a look at
> example.telnet
> > package and also codec.frame package to see how
> DelimiterBasedFrameDecoder
> > can work for you.
> >
> > Another consideration is that, you have a xml message, of course it is
> > string, but you can build your own encoder and decoder to translate POJO
> > from/to xml string and then xml string from/to ChannelBuffer. That would
> be
> > better. I think it would be like this:
> >
> > in your PipelineFactory:
> >
> > // this factory should be used for both client and server to setup
> pipeline
> > public ChannelPipeline getPipeline() throws Exception {
> >        ChannelPipeline pipeline = pipeline();
> >
> >        final int xmlMessageSizeLimit  = 8192; // can be modified to meet
> > your needs
> >        // Add the text line codec combination first,
> >        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
> >                xmlMessageSizeLimit, Delimiters.nulDelimiter()));
> >        pipeline.addLast("decoder", new MyProtocolDecoder());
> >        pipeline.addLast("encoder", new MyProtocolEncoder());
> >
> >        // and then business logic.
> >        pipeline.addLast("handler", handler);
> > }
> >
> > and then define your own encoder and decoder:
> >
> > public class MyProtocolDecoder extends StringDecoder{
> >      //just override decode method
> >      protected Object decode(
> >            ChannelHandlerContext ctx, Channel channel, Object msg) throws
> > Exception {
> >        String xml = super(ctx, channel, msg);
> >        // then use any xml parser or xml binding tool to map xml to your
> > POJO, I use XStream as example
> >        XStream mapper = new XStream();
> >        mapper.alias("header", MyProtocolMessage.MyProtocolHeader.class);
> >        mapper.alias("body", MyProtocolMessage.MyProtocolBody.class);
> >        MyProtocolMessage protoMsg = mapper.fromXML(xml);
> >        return protoMsg;
> >    }
> > }
> >
> > public class MyProtocolMessage {
> >   MyProtocolHeader header;
> >   MyProtocolBody body;
> >
> >   static class MyProtocolHeader {......}
> >   static class MyProtocolBody {......}
> >
> > }
> >
> > you should do the reverse process in MyProtocolEncoder
> >
> > Hope it helps.
> >
> > good luck
> >
> > yanky
> >
> >
> > 2009/4/13 manishr22 <manishr22 at gmail.com>
> >
> > >
> > > Hi,
> > >
> > > How I can read a chunk of data from channel using netty api? my client
> is
> > > writing every time on socket
> > >
> > > <manish> welcome to the real world</manish>
> > >
> > > and my netty server needs to read the message till </manish> in one go.
> > and
> > > then next message starting from <manish> should be read by Netty
> server.
> > >
> > >
> > > Any code snippet would be a great help.
> > >
> > > In my decoder if i read the message from channelbuffer using getbytes()
> > api
> > > i see the data but not able to get i can read the whole message in one
> > go. i
> > > do see sometime some extra message being read from channel. any reason
> > why
> > > it should be.
> > >
> > >
> > >
> > > Thanks in advance.
> > > Regards
> > > Manish
> > >
> > > Hi Yanky,
> > >
> > > Thanks so much for your guidance. I will look into factorial example
> will
> > > solving my problem based on this.
> > >
> > > For any further help, if needed i will post my request here.
> > >
> > > Thanks again
> > >
> > > Manish
> > >
> > >
> > >
> > > Thanks Yanky for the great answer!  So nice that you are here as a
> > > part of the community.
> > >
> > > Cheers,
> > >
> > > — Trustin Lee, http://gleamynode.net/
> > >
> > > On Sat, Apr 11, 2009 at 3:16 AM, yanky young <yanky.young at gmail.com>
> > > wrote:
> > > > Hi:
> > > >
> > > > I am not netty expert yet. I am just trying to give some hints. In
> your
> > > > case, you are actually building a request/response protocol. And
> > request
> > > and
> > > > response message may include quite complicated headers and body. So
> you
> > > have
> > > > to do some message encoding and decoding in both server side and
> client
> > > > side. You can see in netty codebase there are some out-of-box encoder
> > and
> > > > decoder in handler.codec package. I suggest you take a look at
> > > > example.fractorial package to see how to implement your own encoder
> and
> > > > decoder for your message format which may be xml. If you are not
> > > sensitive
> > > > to message format, you can also have a look at example.localtime
> which
> > > use
> > > > google protobuf library as real encoder and decoder.
> > > >
> > > > good luck
> > > >
> > > > yanky
> > > >
> > > >
> > > >
> > > > 2009/4/11 manishr22 <manishr22 at gmail.com>
> > > >>
> > > >> Hi Yanky,
> > > >>
> > > >> I took the sample code from svn trunk and saw the codes too. Could
> you
> > > >> please let me know which one among given sample code will fit for
> the
> > > >> reference of my requirement.
> > > >>
> > > >> Regards,
> > > >> Manish
> > > >>
> > > >> Hi:
> > > >>
> > > >> Of course there are some sample code in netty project. You can just
> > > check
> > > >> out the svn trunk and see some samples in example package.
> > > >>
> > > >> good luck
> > > >>
> > > >> yanky
> > > >>
> > > >>
> > > >> 2009/4/10 manishr22 <manishr22 at gmail.com>
> > > >>
> > > >> >
> > > >> > Hi,
> > > >> >
> > > >> > I had a problem statement
> > > >> >
> > > >> > "to write an asynchronous TCP server to which multiple clients
> will
> > > >> > connect
> > > >> > and keep sending messages to my tcp server. My server should read
> > the
> > > >> > message and send back some acknowledgment information to the
> client
> > > who
> > > >> > sent
> > > >> > the data. While reading the data at server since the message comes
> > in
> > > >> > particular format i have to do some processing before sending the
> > > >> > response
> > > >> > back to client."
> > > >> >
> > > >> > say my client is sending a message structure in which it has some
> > > header
> > > >> > and then in body it has message like :
> > > >> >
> > > >> > <manish>welcome to the real world</manish>
> > > >> >
> > > >> > I was on the way to implement NIO but in between i found such a
> > great
> > > >> > library netty and thought of using it. i was trying to make use of
> > > this
> > > >> > library with the given tutorial and examples but i found that
> > tutorial
> > > >> > is
> > > >> > not good enough to solve the complex problem.
> > > >> >
> > > >> > If you can provide me some guidance or sample code that how i
> should
> > > >> > solve
> > > >> > my problem using netty library, that would be great.
> > > >> >
> > > >> > Thanks in advance.
> > > >> >
> > > >> > Regards
> > > >> > Manish
> > > >> > --
> > > >> > View this message in context:
> > > >> >
> > > >> >
> > >
> >
> http://n2.nabble.com/Code-example-to-write-asynchronous-server-tp2615889p2615889.html
> > > >> > Sent from the Netty User Group mailing list archive at Nabble.com.
> > > >> >
> > > >> > _______________________________________________
> > > >> > netty-users mailing list
> > > >> > netty-users at lists.jboss.org
> > > >> > https://lists.jboss.org/mailman/listinfo/netty-users
> > > >> >
> > > >>
> > > >> _______________________________________________
> > > >> netty-users mailing list
> > > >> netty-users at lists.jboss.org
> > > >> https://lists.jboss.org/mailman/listinfo/netty-users
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> View this message in context:
> > > >>
> > >
> >
> http://n2.nabble.com/Code-example-to-write-asynchronous-server-tp2615889p2617267.html
> > > >> Sent from the Netty User Group mailing list archive at Nabble.com.
> > > >>
> > > >> _______________________________________________
> > > >> netty-users mailing list
> > > >> netty-users at lists.jboss.org
> > > >> https://lists.jboss.org/mailman/listinfo/netty-users
> > > >
> > > >
> > > > _______________________________________________
> > > > netty-users mailing list
> > > > netty-users at lists.jboss.org
> > > > https://lists.jboss.org/mailman/listinfo/netty-users
> > > >
> > > >
> > >
> > > _______________________________________________
> > > netty-users mailing list
> > > netty-users at lists.jboss.org
> > > https://lists.jboss.org/mailman/listinfo/netty-users
> > >
> > >
> > >
> > >
> > >
> > > --
> > > View this message in context:
> > >
> >
> http://n2.nabble.com/Code-example-to-write-asynchronous-server-tp2615889p2627351.html
> > > Sent from the Netty User Group mailing list archive at Nabble.com.
> > >
> > >
> > > _______________________________________________
> > > netty-users mailing list
> > > netty-users at lists.jboss.org
> > > https://lists.jboss.org/mailman/listinfo/netty-users
> > >
> >
> > _______________________________________________
> > netty-users mailing list
> > netty-users at lists.jboss.org
> > https://lists.jboss.org/mailman/listinfo/netty-users
> >
> >
> >
> > --
> > View this message in context:
> >
> http://n2.nabble.com/Code-example-to-write-asynchronous-server-tp2615889p2632783.html
> > Sent from the Netty User Group mailing list archive at Nabble.com.
> >
> >
> > _______________________________________________
> > netty-users mailing list
> > netty-users at lists.jboss.org
> > https://lists.jboss.org/mailman/listinfo/netty-users
> >
>
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users
>
>
>
> --
> View this message in context:
> http://n2.nabble.com/Code-example-to-write-asynchronous-server-tp2615889p2636949.html
> Sent from the Netty User Group mailing list archive at Nabble.com.
>
>
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/netty-users/attachments/20090416/d92afd64/attachment.html 


More information about the netty-users mailing list