Author: trustin
Date: 2009-11-20 22:07:33 -0500 (Fri, 20 Nov 2009)
New Revision: 8355
Modified:
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
Log:
Resolved issue: HORNETQ-221 (Refactor HornetQFrameDecoder and FrameDecoder)
* Rewrote HornetQFrameDecoder
** Outperforms previous implementation by eliminating unnecessary memory copy
** Always generates a dynamic buffer
Modified:
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
---
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2009-11-20
23:25:02 UTC (rev 8354)
+++
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2009-11-21
03:07:33 UTC (rev 8355)
@@ -47,7 +47,7 @@
public static void addCodecFilter(final ChannelPipeline pipeline, final BufferHandler
handler)
{
assert pipeline != null;
- pipeline.addLast("decoder", new HornetQFrameDecoder(handler));
+ pipeline.addLast("decoder", new HornetQFrameDecoder());
}
public static void addSSLFilter(final ChannelPipeline pipeline, final SSLContext
context, final boolean client) throws Exception
Modified:
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
===================================================================
---
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java 2009-11-20
23:25:02 UTC (rev 8354)
+++
trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java 2009-11-21
03:07:33 UTC (rev 8355)
@@ -15,15 +15,17 @@
import static org.hornetq.utils.DataConstants.SIZE_INT;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.spi.BufferHandler;
import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.buffer.DynamicChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.frame.FrameDecoder;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
/**
- * A Netty FrameDecoder used to decode messages.
+ * A Netty decoder used to decode messages.
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
@@ -31,34 +33,151 @@
*
* @version $Revision$, $Date$
*/
-public class HornetQFrameDecoder extends FrameDecoder
+@ChannelPipelineCoverage("one")
+public class HornetQFrameDecoder extends SimpleChannelUpstreamHandler
{
- private static final Logger log = Logger.getLogger(HornetQFrameDecoder.class);
+ private ChannelBuffer previousData = ChannelBuffers.EMPTY_BUFFER;
- private final BufferHandler handler;
-
- public HornetQFrameDecoder(final BufferHandler handler)
- {
- this.handler = handler;
- }
-
- // FrameDecoder overrides
+ // SimpleChannelUpstreamHandler overrides
//
-------------------------------------------------------------------------------------
@Override
- protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer in)
throws Exception
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
Exception
{
- // TODO - we can avoid this entirely if we maintain fragmented packets in the
handler
- int start = in.readerIndex();
+ ChannelBuffer in = (ChannelBuffer) e.getMessage();
+ if (previousData.readable())
+ {
+ if (previousData.readableBytes() + in.readableBytes() < SIZE_INT) {
+ // XXX Length is unknown. Bet at 512. Tune this value.
+ append(in, 512);
+ return;
+ }
+
+ // Decode the first message. The first message requires a special
+ // treatment because it is the only message that spans over the two
+ // buffers.
+ final int length;
+ switch (previousData.readableBytes()) {
+ case 1:
+ length = (previousData.getUnsignedByte(previousData.readerIndex())
<< 24) |
+ in.getMedium(in.readerIndex());
+ if (in.readableBytes() - 3 < length) {
+ append(in, length);
+ return;
+ }
+ break;
+ case 2:
+ length = (previousData.getUnsignedShort(previousData.readerIndex())
<< 16) |
+ in.getUnsignedShort(in.readerIndex());
+ if (in.readableBytes() - 2 < length) {
+ append(in, length);
+ return;
+ }
+ break;
+ case 3:
+ length = (previousData.getUnsignedMedium(previousData.readerIndex())
<< 8) |
+ in.getUnsignedByte(in.readerIndex());
+ if (in.readableBytes() - 1 < length) {
+ append(in, length);
+ return;
+ }
+ break;
+ case 4:
+ length = previousData.getInt(previousData.readerIndex());
+ if (in.readableBytes() - 4 < length) {
+ append(in, length);
+ return;
+ }
+ break;
+ default:
+ length = previousData.getInt(previousData.readerIndex());
+ if (in.readableBytes() + previousData.readableBytes() - 4 < length) {
+ append(in, length);
+ return;
+ }
+ }
+
+ final ChannelBuffer frame;
+ if (previousData instanceof DynamicChannelBuffer) {
+ // It's safe to reuse the current dynamic buffer
+ // because previousData will be reassigned to
+ // EMPTY_BUFFER or 'in' later.
+ previousData.writeBytes(in, length + 4 - previousData.readableBytes());
+ frame = previousData;
+ } else {
+ // XXX Tune this value: Increasing the initial capacity of the
+ // dynamic buffer might reduce the chance of additional memory
+ // copy.
+ frame = ChannelBuffers.dynamicBuffer(length + 4);
+ frame.writeBytes(previousData, previousData.readerIndex(),
previousData.readableBytes());
+ frame.writeBytes(in, length + 4 - frame.writerIndex());
+ }
- int length = handler.isReadyToHandle(new ChannelBufferWrapper(in));
- if (length == -1)
+ frame.skipBytes(4);
+ Channels.fireMessageReceived(ctx, frame);
+
+ if (!in.readable()) {
+ previousData = ChannelBuffers.EMPTY_BUFFER;
+ return;
+ }
+ }
+
+ // And then handle the rest - we don't need to deal with the
+ // composite buffer anymore because the second or later messages
+ // always belong to the second buffer.
+ decode(ctx, in);
+
+ // Handle the leftover.
+ if (in.readable())
{
- in.readerIndex(start);
- return null;
+ previousData = in;
}
-
- in.readerIndex(start + SIZE_INT);
- return in.readBytes(length);
+ else
+ {
+ previousData = ChannelBuffers.EMPTY_BUFFER;
+ }
}
+
+ private void decode(ChannelHandlerContext ctx, ChannelBuffer in)
+ {
+ for (;;) {
+ final int readableBytes = in.readableBytes();
+ if (readableBytes < SIZE_INT) {
+ break;
+ }
+
+ final int length = in.getInt(in.readerIndex());
+ if (readableBytes < length + SIZE_INT) {
+ break;
+ }
+
+ // Convert to dynamic buffer (this requires copy)
+ // XXX Tune this value: Increasing the initial capacity of the dynamic
+ // buffer might reduce the chance of additional memory copy.
+ ChannelBuffer frame = ChannelBuffers.dynamicBuffer(length + SIZE_INT);
+ frame.writeBytes(in, length + SIZE_INT);
+ frame.skipBytes(SIZE_INT);
+ Channels.fireMessageReceived(ctx, frame);
+ }
+ }
+
+ private void append(ChannelBuffer in, int length)
+ {
+ // Need more data to decode the first message. This can happen when
+ // a client is very slow. (e.g.sending each byte one by one)
+ if (previousData instanceof DynamicChannelBuffer)
+ {
+ previousData.discardReadBytes();
+ previousData.writeBytes(in);
+ }
+ else
+ {
+ ChannelBuffer newPreviousData =
+ ChannelBuffers.dynamicBuffer(
+ Math.max(previousData.readableBytes() + in.readableBytes(), length +
4));
+ newPreviousData.writeBytes(previousData);
+ newPreviousData.writeBytes(in);
+ previousData = newPreviousData;
+ }
+ }
}
Show replies by date