[hornetq-commits] JBoss hornetq SVN: r8398 - trunk/src/main/org/hornetq/integration/transports/netty.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 24 19:56:16 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-24 19:56:16 -0500 (Tue, 24 Nov 2009)
New Revision: 8398

Modified:
   trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
   trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
Log:
Reverting Trustin's commit for now. Those changes were supposed to be done at the optimization's branch.

Modified: trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2009-11-25 00:54:37 UTC (rev 8397)
+++ trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2009-11-25 00:56:16 UTC (rev 8398)
@@ -47,7 +47,7 @@
    public static void addCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
    {
       assert pipeline != null;
-      pipeline.addLast("decoder", new HornetQFrameDecoder());
+      pipeline.addLast("decoder", new HornetQFrameDecoder(handler));
    }
 
    public static void addSSLFilter(final ChannelPipeline pipeline, final SSLContext context, final boolean client) throws Exception

Modified: trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java	2009-11-25 00:54:37 UTC (rev 8397)
+++ trunk/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java	2009-11-25 00:56:16 UTC (rev 8398)
@@ -15,17 +15,15 @@
 
 import static org.hornetq.utils.DataConstants.SIZE_INT;
 
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.BufferHandler;
 import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.buffer.DynamicChannelBuffer;
+import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
 
 /**
- * A Netty decoder used to decode messages.
+ * A Netty FrameDecoder used to decode messages.
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
@@ -33,151 +31,34 @@
  *
  * @version $Revision$, $Date$
  */
- at ChannelPipelineCoverage("one")
-public class HornetQFrameDecoder extends SimpleChannelUpstreamHandler
+public class HornetQFrameDecoder extends FrameDecoder
 {
-   private ChannelBuffer previousData = ChannelBuffers.EMPTY_BUFFER;
+   private static final Logger log = Logger.getLogger(HornetQFrameDecoder.class);
 
-   // SimpleChannelUpstreamHandler overrides
+   private final BufferHandler handler;
+
+   public HornetQFrameDecoder(final BufferHandler handler)
+   {
+      this.handler = handler;
+   }
+
+   // FrameDecoder overrides
    // -------------------------------------------------------------------------------------
 
    @Override
-   public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+   protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer in) throws Exception
    {
-      ChannelBuffer in = (ChannelBuffer) e.getMessage();
-      if (previousData.readable())
-      {
-         if (previousData.readableBytes() + in.readableBytes() < SIZE_INT) {
-            // XXX Length is unknown. Bet at 512. Tune this value.
-            append(in, 512); 
-            return;
-         }
-         
-         // Decode the first message.  The first message requires a special
-         // treatment because it is the only message that spans over the two
-         // buffers.
-         final int length;
-         switch (previousData.readableBytes()) {
-            case 1:
-               length = (previousData.getUnsignedByte(previousData.readerIndex()) << 24) |
-                        in.getMedium(in.readerIndex());
-               if (in.readableBytes() - 3 < length) {
-                  append(in, length);
-                  return;
-               }
-               break;
-            case 2:
-               length = (previousData.getUnsignedShort(previousData.readerIndex()) << 16) |
-                        in.getUnsignedShort(in.readerIndex());
-               if (in.readableBytes() - 2 < length) {
-                  append(in, length);
-                  return;
-               }
-               break;
-            case 3:
-               length = (previousData.getUnsignedMedium(previousData.readerIndex()) << 8) |
-                        in.getUnsignedByte(in.readerIndex());
-               if (in.readableBytes() - 1 < length) {
-                  append(in, length);
-                  return;
-               }
-               break;
-            case 4:
-               length = previousData.getInt(previousData.readerIndex());
-               if (in.readableBytes() - 4 < length) {
-                  append(in, length);
-                  return;
-               }
-               break;
-            default:
-               length = previousData.getInt(previousData.readerIndex());
-               if (in.readableBytes() + previousData.readableBytes() - 4 < length) {
-                  append(in, length);
-                  return;
-               }
-         }
-         
-         final ChannelBuffer frame;
-         if (previousData instanceof DynamicChannelBuffer) {
-            // It's safe to reuse the current dynamic buffer
-            // because previousData will be reassigned to
-            // EMPTY_BUFFER or 'in' later.
-            previousData.writeBytes(in, length + 4 - previousData.readableBytes());
-            frame = previousData;
-         } else {
-            // XXX Tune this value: Increasing the initial capacity of the
-            //     dynamic buffer might reduce the chance of additional memory
-            //     copy.
-            frame = ChannelBuffers.dynamicBuffer(length + 4);
-            frame.writeBytes(previousData, previousData.readerIndex(), previousData.readableBytes());
-            frame.writeBytes(in, length + 4 - frame.writerIndex());
-         }
+      // TODO - we can avoid this entirely if we maintain fragmented packets in the handler
+      int start = in.readerIndex();
 
-         frame.skipBytes(4);
-         Channels.fireMessageReceived(ctx, frame);
-
-         if (!in.readable()) {
-            previousData = ChannelBuffers.EMPTY_BUFFER;
-            return;
-         }
-      }
-
-      // And then handle the rest - we don't need to deal with the
-      // composite buffer anymore because the second or later messages
-      // always belong to the second buffer.
-      decode(ctx, in);
-      
-      // Handle the leftover.
-      if (in.readable())
+      int length = handler.isReadyToHandle(new ChannelBufferWrapper(in));
+      if (length == -1)
       {
-         previousData = in;
+         in.readerIndex(start);
+         return null;
       }
-      else
-      {
-         previousData = ChannelBuffers.EMPTY_BUFFER;
-      }
+
+      in.readerIndex(start + SIZE_INT);
+      return in.readBytes(length);
    }
-   
-   private void decode(ChannelHandlerContext ctx, ChannelBuffer in)
-   {
-      for (;;) {
-         final int readableBytes = in.readableBytes();
-         if (readableBytes < SIZE_INT) {
-            break;
-         }
-         
-         final int length = in.getInt(in.readerIndex());
-         if (readableBytes < length + SIZE_INT) {
-            break;
-         }
-         
-         // Convert to dynamic buffer (this requires copy)
-         // XXX Tune this value: Increasing the initial capacity of the dynamic
-         //     buffer might reduce the chance of additional memory copy.
-         ChannelBuffer frame = ChannelBuffers.dynamicBuffer(length + SIZE_INT);
-         frame.writeBytes(in, length + SIZE_INT);
-         frame.skipBytes(SIZE_INT);
-         Channels.fireMessageReceived(ctx, frame);
-      }
-   }
-   
-   private void append(ChannelBuffer in, int length)
-   {
-      // Need more data to decode the first message. This can happen when
-      // a client is very slow. (e.g.sending each byte one by one)
-      if (previousData instanceof DynamicChannelBuffer)
-      {
-         previousData.discardReadBytes();
-         previousData.writeBytes(in);
-      }
-      else
-      {
-         ChannelBuffer newPreviousData =
-              ChannelBuffers.dynamicBuffer(
-                    Math.max(previousData.readableBytes() + in.readableBytes(), length + 4));
-         newPreviousData.writeBytes(previousData);
-         newPreviousData.writeBytes(in);
-         previousData = newPreviousData;
-      }
-   }
 }



More information about the hornetq-commits mailing list