[hornetq-commits] JBoss hornetq SVN: r8338 - branches/20-optimisation/src/main/org/hornetq/integration/transports/netty.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 20 09:35:17 EST 2009


Author: trustin
Date: 2009-11-20 09:35:17 -0500 (Fri, 20 Nov 2009)
New Revision: 8338

Modified:
   branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
Log:
Fixed a bug in HornetQFrameDecoder2 where the first four bytes length header is omitted

Modified: branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java	2009-11-20 14:20:03 UTC (rev 8337)
+++ branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java	2009-11-20 14:35:17 UTC (rev 8338)
@@ -64,7 +64,6 @@
          // treatment because it is the only message that spans over the two
          // buffers.
          final int length;
-         final ChannelBuffer frame;
          switch (previousData.readableBytes()) {
             case 1:
                length = (previousData.getUnsignedByte(previousData.readerIndex()) << 24) |
@@ -72,9 +71,6 @@
                if (in.readableBytes() - 3 < length) {
                   append(in, length);
                   return;
-               } else {
-                  frame = in.slice(in.readerIndex() + 3, length);
-                  in.skipBytes(length + 3);
                }
                break;
             case 2:
@@ -83,9 +79,6 @@
                if (in.readableBytes() - 2 < length) {
                   append(in, length);
                   return;
-               } else {
-                  frame = in.slice(in.readerIndex() + 2, length);
-                  in.skipBytes(length + 2);
                }
                break;
             case 3:
@@ -94,9 +87,6 @@
                if (in.readableBytes() - 1 < length) {
                   append(in, length);
                   return;
-               } else {
-                  frame = in.slice(in.readerIndex() + 1, length);
-                  in.skipBytes(length + 1);
                }
                break;
             case 4:
@@ -104,9 +94,6 @@
                if (in.readableBytes() - 4 < length) {
                   append(in, length);
                   return;
-               } else {
-                  frame = in.slice(in.readerIndex(), length);
-                  in.skipBytes(length);
                }
                break;
             default:
@@ -114,22 +101,23 @@
                if (in.readableBytes() + previousData.readableBytes() - 4 < length) {
                   append(in, length);
                   return;
-               } else {
-                  if (previousData instanceof DynamicChannelBuffer) {
-                     // It's safe to reuse the current dynamic buffer
-                     // because previousData will be reassigned to
-                     // EMPTY_BUFFER or 'in' later.
-                     previousData.skipBytes(4);
-                     previousData.writeBytes(in, length - previousData.readableBytes());
-                     frame = previousData.slice();
-                  } else {
-                     frame = ChannelBuffers.buffer(length);
-                     frame.writeBytes(previousData, previousData.readerIndex() + 4, previousData.readableBytes() - 4);
-                     frame.writeBytes(in, length - frame.writerIndex());
-                  }
                }
          }
          
+         final ChannelBuffer frame;
+         if (previousData instanceof DynamicChannelBuffer) {
+            // It's safe to reuse the current dynamic buffer
+            // because previousData will be reassigned to
+            // EMPTY_BUFFER or 'in' later.
+            previousData.writeBytes(in, length + 4 - previousData.readableBytes());
+            frame = previousData.slice();
+         } else {
+            frame = ChannelBuffers.buffer(length + 4);
+            frame.writeBytes(previousData, previousData.readerIndex(), previousData.readableBytes());
+            frame.writeBytes(in, length + 4 - frame.writerIndex());
+         }
+
+         frame.skipBytes(4);
          Channels.fireMessageReceived(ctx, frame);
 
          if (!in.readable()) {
@@ -167,8 +155,9 @@
             break;
          }
          
-         ChannelBuffer frame = in.slice(in.readerIndex() + SIZE_INT, length);
+         ChannelBuffer frame = in.slice(in.readerIndex(), length + SIZE_INT);
          in.skipBytes(SIZE_INT + length);
+         frame.skipBytes(SIZE_INT);
          Channels.fireMessageReceived(ctx, frame);
       }
    }
@@ -179,6 +168,7 @@
       // a client is very slow. (e.g.sending each byte one by one)
       if (previousData instanceof DynamicChannelBuffer)
       {
+         previousData.discardReadBytes();
          previousData.writeBytes(in);
       }
       else
@@ -206,27 +196,32 @@
             }
          });
       }
+      
+      final int MSG_LEN = 40;
 
       final DecoderEmbedder<ChannelBuffer> decoder =
          new DecoderEmbedder<ChannelBuffer>(handler);
 
-      ChannelBuffer src = ChannelBuffers.buffer(30000 * 1004);
+      ChannelBuffer src = ChannelBuffers.buffer(30000 * (MSG_LEN + 4));
       while (src.writerIndex() < src.capacity()) {
-         src.writeInt(1000);
-         src.writeZero(1000);
+         src.writeInt(MSG_LEN);
+         for (int i = 0; i < MSG_LEN; i ++) {
+            src.writeByte((byte) i);
+         }
+         //src.writeZero(1000);
       }
 
       Random rand = new Random();
       List<ChannelBuffer> packets = new ArrayList<ChannelBuffer>();
       for (int i = 0; i < src.capacity();) {
-         int length = Math.min(rand.nextInt(3000), src.capacity() - i);
+         int length = Math.min(rand.nextInt(10000), src.capacity() - i);
          packets.add(src.copy(i, length));
          i += length;
       }
       
       long startTime = System.nanoTime();
          
-      for (int i = 0; i < 100; i ++) {
+      for (int i = 0; i < 10; i ++) {
          int cnt = 0;
          for (ChannelBuffer p: packets) {
             decoder.offer(p.duplicate());
@@ -235,9 +230,14 @@
                if (frame == null) {
                   break;
                }
-               if (frame.readableBytes() != 1000) {
+               if (frame.readableBytes() != MSG_LEN) {
                   System.out.println("ARGH 1: " + frame.readableBytes());
                }
+               for (int j = 0; j < MSG_LEN; j ++) {
+                  if (frame.readByte() != (byte) j) {
+                     System.out.println("ARGH 3");
+                  }
+               }
                cnt ++;
             }
          }



More information about the hornetq-commits mailing list