[hornetq-commits] JBoss hornetq SVN: r8355 - trunk/src/main/org/hornetq/integration/transports/netty.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 20 22:07:34 EST 2009


Author: trustin
Date: 2009-11-20 22:07:33 -0500 (Fri, 20 Nov 2009)
New Revision: 8355

Modified:
   trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
   trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
Log:
Resolved issue: HORNETQ-221 (Refactor HornetQFrameDecoder and FrameDecoder)
* Rewrote HornetQFrameDecoder
** Outperforms previous implementation by eliminating unnecessary memory copy
** Always generates a dynamic buffer


Modified: trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2009-11-20 23:25:02 UTC (rev 8354)
+++ trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2009-11-21 03:07:33 UTC (rev 8355)
@@ -47,7 +47,7 @@
    public static void addCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
    {
       assert pipeline != null;
-      pipeline.addLast("decoder", new HornetQFrameDecoder(handler));
+      pipeline.addLast("decoder", new HornetQFrameDecoder());
    }
 
    public static void addSSLFilter(final ChannelPipeline pipeline, final SSLContext context, final boolean client) throws Exception

Modified: trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java	2009-11-20 23:25:02 UTC (rev 8354)
+++ trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java	2009-11-21 03:07:33 UTC (rev 8355)
@@ -15,15 +15,17 @@
 
 import static org.hornetq.utils.DataConstants.SIZE_INT;
 
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.spi.BufferHandler;
 import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.buffer.DynamicChannelBuffer;
 import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.frame.FrameDecoder;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 
 /**
- * A Netty FrameDecoder used to decode messages.
+ * A Netty decoder used to decode messages.
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
@@ -31,34 +33,151 @@
  *
  * @version $Revision$, $Date$
  */
-public class HornetQFrameDecoder extends FrameDecoder
+ at ChannelPipelineCoverage("one")
+public class HornetQFrameDecoder extends SimpleChannelUpstreamHandler
 {
-   private static final Logger log = Logger.getLogger(HornetQFrameDecoder.class);
+   private ChannelBuffer previousData = ChannelBuffers.EMPTY_BUFFER;
 
-   private final BufferHandler handler;
-
-   public HornetQFrameDecoder(final BufferHandler handler)
-   {
-      this.handler = handler;
-   }
-
-   // FrameDecoder overrides
+   // SimpleChannelUpstreamHandler overrides
    // -------------------------------------------------------------------------------------
 
    @Override
-   protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer in) throws Exception
+   public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
    {
-      // TODO - we can avoid this entirely if we maintain fragmented packets in the handler
-      int start = in.readerIndex();
+      ChannelBuffer in = (ChannelBuffer) e.getMessage();
+      if (previousData.readable())
+      {
+         if (previousData.readableBytes() + in.readableBytes() < SIZE_INT) {
+            // XXX Length is unknown. Bet at 512. Tune this value.
+            append(in, 512); 
+            return;
+         }
+         
+         // Decode the first message.  The first message requires a special
+         // treatment because it is the only message that spans over the two
+         // buffers.
+         final int length;
+         switch (previousData.readableBytes()) {
+            case 1:
+               length = (previousData.getUnsignedByte(previousData.readerIndex()) << 24) |
+                        in.getMedium(in.readerIndex());
+               if (in.readableBytes() - 3 < length) {
+                  append(in, length);
+                  return;
+               }
+               break;
+            case 2:
+               length = (previousData.getUnsignedShort(previousData.readerIndex()) << 16) |
+                        in.getUnsignedShort(in.readerIndex());
+               if (in.readableBytes() - 2 < length) {
+                  append(in, length);
+                  return;
+               }
+               break;
+            case 3:
+               length = (previousData.getUnsignedMedium(previousData.readerIndex()) << 8) |
+                        in.getUnsignedByte(in.readerIndex());
+               if (in.readableBytes() - 1 < length) {
+                  append(in, length);
+                  return;
+               }
+               break;
+            case 4:
+               length = previousData.getInt(previousData.readerIndex());
+               if (in.readableBytes() - 4 < length) {
+                  append(in, length);
+                  return;
+               }
+               break;
+            default:
+               length = previousData.getInt(previousData.readerIndex());
+               if (in.readableBytes() + previousData.readableBytes() - 4 < length) {
+                  append(in, length);
+                  return;
+               }
+         }
+         
+         final ChannelBuffer frame;
+         if (previousData instanceof DynamicChannelBuffer) {
+            // It's safe to reuse the current dynamic buffer
+            // because previousData will be reassigned to
+            // EMPTY_BUFFER or 'in' later.
+            previousData.writeBytes(in, length + 4 - previousData.readableBytes());
+            frame = previousData;
+         } else {
+            // XXX Tune this value: Increasing the initial capacity of the
+            //     dynamic buffer might reduce the chance of additional memory
+            //     copy.
+            frame = ChannelBuffers.dynamicBuffer(length + 4);
+            frame.writeBytes(previousData, previousData.readerIndex(), previousData.readableBytes());
+            frame.writeBytes(in, length + 4 - frame.writerIndex());
+         }
 
-      int length = handler.isReadyToHandle(new ChannelBufferWrapper(in));
-      if (length == -1)
+         frame.skipBytes(4);
+         Channels.fireMessageReceived(ctx, frame);
+
+         if (!in.readable()) {
+            previousData = ChannelBuffers.EMPTY_BUFFER;
+            return;
+         }
+      }
+
+      // And then handle the rest - we don't need to deal with the
+      // composite buffer anymore because the second or later messages
+      // always belong to the second buffer.
+      decode(ctx, in);
+      
+      // Handle the leftover.
+      if (in.readable())
       {
-         in.readerIndex(start);
-         return null;
+         previousData = in;
       }
-
-      in.readerIndex(start + SIZE_INT);
-      return in.readBytes(length);
+      else
+      {
+         previousData = ChannelBuffers.EMPTY_BUFFER;
+      }
    }
+   
+   private void decode(ChannelHandlerContext ctx, ChannelBuffer in)
+   {
+      for (;;) {
+         final int readableBytes = in.readableBytes();
+         if (readableBytes < SIZE_INT) {
+            break;
+         }
+         
+         final int length = in.getInt(in.readerIndex());
+         if (readableBytes < length + SIZE_INT) {
+            break;
+         }
+         
+         // Convert to dynamic buffer (this requires copy)
+         // XXX Tune this value: Increasing the initial capacity of the dynamic
+         //     buffer might reduce the chance of additional memory copy.
+         ChannelBuffer frame = ChannelBuffers.dynamicBuffer(length + SIZE_INT);
+         frame.writeBytes(in, length + SIZE_INT);
+         frame.skipBytes(SIZE_INT);
+         Channels.fireMessageReceived(ctx, frame);
+      }
+   }
+   
+   private void append(ChannelBuffer in, int length)
+   {
+      // Need more data to decode the first message. This can happen when
+      // a client is very slow. (e.g.sending each byte one by one)
+      if (previousData instanceof DynamicChannelBuffer)
+      {
+         previousData.discardReadBytes();
+         previousData.writeBytes(in);
+      }
+      else
+      {
+         ChannelBuffer newPreviousData =
+              ChannelBuffers.dynamicBuffer(
+                    Math.max(previousData.readableBytes() + in.readableBytes(), length + 4));
+         newPreviousData.writeBytes(previousData);
+         newPreviousData.writeBytes(in);
+         previousData = newPreviousData;
+      }
+   }
 }



More information about the hornetq-commits mailing list