Author: clebert.suconic(a)jboss.com
Date: 2009-11-24 19:56:16 -0500 (Tue, 24 Nov 2009)
New Revision: 8398
Modified:
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
Log:
Reverting Trustin's commit for now. Those changes were supposed to be done at the
optimization's branch.
Modified:
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
---
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2009-11-25
00:54:37 UTC (rev 8397)
+++
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2009-11-25
00:56:16 UTC (rev 8398)
@@ -47,7 +47,7 @@
public static void addCodecFilter(final ChannelPipeline pipeline, final BufferHandler
handler)
{
assert pipeline != null;
- pipeline.addLast("decoder", new HornetQFrameDecoder());
+ pipeline.addLast("decoder", new HornetQFrameDecoder(handler));
}
public static void addSSLFilter(final ChannelPipeline pipeline, final SSLContext
context, final boolean client) throws Exception
Modified:
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
===================================================================
---
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java 2009-11-25
00:54:37 UTC (rev 8397)
+++
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java 2009-11-25
00:56:16 UTC (rev 8398)
@@ -15,17 +15,15 @@
import static org.hornetq.utils.DataConstants.SIZE_INT;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.BufferHandler;
import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.buffer.DynamicChannelBuffer;
+import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
/**
- * A Netty decoder used to decode messages.
+ * A Netty FrameDecoder used to decode messages.
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
@@ -33,151 +31,34 @@
*
* @version $Revision$, $Date$
*/
-@ChannelPipelineCoverage("one")
-public class HornetQFrameDecoder extends SimpleChannelUpstreamHandler
+public class HornetQFrameDecoder extends FrameDecoder
{
- private ChannelBuffer previousData = ChannelBuffers.EMPTY_BUFFER;
+ private static final Logger log = Logger.getLogger(HornetQFrameDecoder.class);
- // SimpleChannelUpstreamHandler overrides
+ private final BufferHandler handler;
+
+ public HornetQFrameDecoder(final BufferHandler handler)
+ {
+ this.handler = handler;
+ }
+
+ // FrameDecoder overrides
//
-------------------------------------------------------------------------------------
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
Exception
+ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer in)
throws Exception
{
- ChannelBuffer in = (ChannelBuffer) e.getMessage();
- if (previousData.readable())
- {
- if (previousData.readableBytes() + in.readableBytes() < SIZE_INT) {
- // XXX Length is unknown. Bet at 512. Tune this value.
- append(in, 512);
- return;
- }
-
- // Decode the first message. The first message requires a special
- // treatment because it is the only message that spans over the two
- // buffers.
- final int length;
- switch (previousData.readableBytes()) {
- case 1:
- length = (previousData.getUnsignedByte(previousData.readerIndex())
<< 24) |
- in.getMedium(in.readerIndex());
- if (in.readableBytes() - 3 < length) {
- append(in, length);
- return;
- }
- break;
- case 2:
- length = (previousData.getUnsignedShort(previousData.readerIndex())
<< 16) |
- in.getUnsignedShort(in.readerIndex());
- if (in.readableBytes() - 2 < length) {
- append(in, length);
- return;
- }
- break;
- case 3:
- length = (previousData.getUnsignedMedium(previousData.readerIndex())
<< 8) |
- in.getUnsignedByte(in.readerIndex());
- if (in.readableBytes() - 1 < length) {
- append(in, length);
- return;
- }
- break;
- case 4:
- length = previousData.getInt(previousData.readerIndex());
- if (in.readableBytes() - 4 < length) {
- append(in, length);
- return;
- }
- break;
- default:
- length = previousData.getInt(previousData.readerIndex());
- if (in.readableBytes() + previousData.readableBytes() - 4 < length) {
- append(in, length);
- return;
- }
- }
-
- final ChannelBuffer frame;
- if (previousData instanceof DynamicChannelBuffer) {
- // It's safe to reuse the current dynamic buffer
- // because previousData will be reassigned to
- // EMPTY_BUFFER or 'in' later.
- previousData.writeBytes(in, length + 4 - previousData.readableBytes());
- frame = previousData;
- } else {
- // XXX Tune this value: Increasing the initial capacity of the
- // dynamic buffer might reduce the chance of additional memory
- // copy.
- frame = ChannelBuffers.dynamicBuffer(length + 4);
- frame.writeBytes(previousData, previousData.readerIndex(),
previousData.readableBytes());
- frame.writeBytes(in, length + 4 - frame.writerIndex());
- }
+ // TODO - we can avoid this entirely if we maintain fragmented packets in the
handler
+ int start = in.readerIndex();
- frame.skipBytes(4);
- Channels.fireMessageReceived(ctx, frame);
-
- if (!in.readable()) {
- previousData = ChannelBuffers.EMPTY_BUFFER;
- return;
- }
- }
-
- // And then handle the rest - we don't need to deal with the
- // composite buffer anymore because the second or later messages
- // always belong to the second buffer.
- decode(ctx, in);
-
- // Handle the leftover.
- if (in.readable())
+ int length = handler.isReadyToHandle(new ChannelBufferWrapper(in));
+ if (length == -1)
{
- previousData = in;
+ in.readerIndex(start);
+ return null;
}
- else
- {
- previousData = ChannelBuffers.EMPTY_BUFFER;
- }
+
+ in.readerIndex(start + SIZE_INT);
+ return in.readBytes(length);
}
-
- private void decode(ChannelHandlerContext ctx, ChannelBuffer in)
- {
- for (;;) {
- final int readableBytes = in.readableBytes();
- if (readableBytes < SIZE_INT) {
- break;
- }
-
- final int length = in.getInt(in.readerIndex());
- if (readableBytes < length + SIZE_INT) {
- break;
- }
-
- // Convert to dynamic buffer (this requires copy)
- // XXX Tune this value: Increasing the initial capacity of the dynamic
- // buffer might reduce the chance of additional memory copy.
- ChannelBuffer frame = ChannelBuffers.dynamicBuffer(length + SIZE_INT);
- frame.writeBytes(in, length + SIZE_INT);
- frame.skipBytes(SIZE_INT);
- Channels.fireMessageReceived(ctx, frame);
- }
- }
-
- private void append(ChannelBuffer in, int length)
- {
- // Need more data to decode the first message. This can happen when
- // a client is very slow. (e.g.sending each byte one by one)
- if (previousData instanceof DynamicChannelBuffer)
- {
- previousData.discardReadBytes();
- previousData.writeBytes(in);
- }
- else
- {
- ChannelBuffer newPreviousData =
- ChannelBuffers.dynamicBuffer(
- Math.max(previousData.readableBytes() + in.readableBytes(), length +
4));
- newPreviousData.writeBytes(previousData);
- newPreviousData.writeBytes(in);
- previousData = newPreviousData;
- }
- }
}