Author: trustin
Date: 2009-11-20 09:35:17 -0500 (Fri, 20 Nov 2009)
New Revision: 8338
Modified:
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
Log:
Fixed a bug in HornetQFrameDecoder2 where the first four bytes length header is omitted
Modified:
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2009-11-20
14:20:03 UTC (rev 8337)
+++
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2009-11-20
14:35:17 UTC (rev 8338)
@@ -64,7 +64,6 @@
// treatment because it is the only message that spans over the two
// buffers.
final int length;
- final ChannelBuffer frame;
switch (previousData.readableBytes()) {
case 1:
length = (previousData.getUnsignedByte(previousData.readerIndex())
<< 24) |
@@ -72,9 +71,6 @@
if (in.readableBytes() - 3 < length) {
append(in, length);
return;
- } else {
- frame = in.slice(in.readerIndex() + 3, length);
- in.skipBytes(length + 3);
}
break;
case 2:
@@ -83,9 +79,6 @@
if (in.readableBytes() - 2 < length) {
append(in, length);
return;
- } else {
- frame = in.slice(in.readerIndex() + 2, length);
- in.skipBytes(length + 2);
}
break;
case 3:
@@ -94,9 +87,6 @@
if (in.readableBytes() - 1 < length) {
append(in, length);
return;
- } else {
- frame = in.slice(in.readerIndex() + 1, length);
- in.skipBytes(length + 1);
}
break;
case 4:
@@ -104,9 +94,6 @@
if (in.readableBytes() - 4 < length) {
append(in, length);
return;
- } else {
- frame = in.slice(in.readerIndex(), length);
- in.skipBytes(length);
}
break;
default:
@@ -114,22 +101,23 @@
if (in.readableBytes() + previousData.readableBytes() - 4 < length) {
append(in, length);
return;
- } else {
- if (previousData instanceof DynamicChannelBuffer) {
- // It's safe to reuse the current dynamic buffer
- // because previousData will be reassigned to
- // EMPTY_BUFFER or 'in' later.
- previousData.skipBytes(4);
- previousData.writeBytes(in, length - previousData.readableBytes());
- frame = previousData.slice();
- } else {
- frame = ChannelBuffers.buffer(length);
- frame.writeBytes(previousData, previousData.readerIndex() + 4,
previousData.readableBytes() - 4);
- frame.writeBytes(in, length - frame.writerIndex());
- }
}
}
+ final ChannelBuffer frame;
+ if (previousData instanceof DynamicChannelBuffer) {
+ // It's safe to reuse the current dynamic buffer
+ // because previousData will be reassigned to
+ // EMPTY_BUFFER or 'in' later.
+ previousData.writeBytes(in, length + 4 - previousData.readableBytes());
+ frame = previousData.slice();
+ } else {
+ frame = ChannelBuffers.buffer(length + 4);
+ frame.writeBytes(previousData, previousData.readerIndex(),
previousData.readableBytes());
+ frame.writeBytes(in, length + 4 - frame.writerIndex());
+ }
+
+ frame.skipBytes(4);
Channels.fireMessageReceived(ctx, frame);
if (!in.readable()) {
@@ -167,8 +155,9 @@
break;
}
- ChannelBuffer frame = in.slice(in.readerIndex() + SIZE_INT, length);
+ ChannelBuffer frame = in.slice(in.readerIndex(), length + SIZE_INT);
in.skipBytes(SIZE_INT + length);
+ frame.skipBytes(SIZE_INT);
Channels.fireMessageReceived(ctx, frame);
}
}
@@ -179,6 +168,7 @@
// a client is very slow. (e.g.sending each byte one by one)
if (previousData instanceof DynamicChannelBuffer)
{
+ previousData.discardReadBytes();
previousData.writeBytes(in);
}
else
@@ -206,27 +196,32 @@
}
});
}
+
+ final int MSG_LEN = 40;
final DecoderEmbedder<ChannelBuffer> decoder =
new DecoderEmbedder<ChannelBuffer>(handler);
- ChannelBuffer src = ChannelBuffers.buffer(30000 * 1004);
+ ChannelBuffer src = ChannelBuffers.buffer(30000 * (MSG_LEN + 4));
while (src.writerIndex() < src.capacity()) {
- src.writeInt(1000);
- src.writeZero(1000);
+ src.writeInt(MSG_LEN);
+ for (int i = 0; i < MSG_LEN; i ++) {
+ src.writeByte((byte) i);
+ }
+ //src.writeZero(1000);
}
Random rand = new Random();
List<ChannelBuffer> packets = new ArrayList<ChannelBuffer>();
for (int i = 0; i < src.capacity();) {
- int length = Math.min(rand.nextInt(3000), src.capacity() - i);
+ int length = Math.min(rand.nextInt(10000), src.capacity() - i);
packets.add(src.copy(i, length));
i += length;
}
long startTime = System.nanoTime();
- for (int i = 0; i < 100; i ++) {
+ for (int i = 0; i < 10; i ++) {
int cnt = 0;
for (ChannelBuffer p: packets) {
decoder.offer(p.duplicate());
@@ -235,9 +230,14 @@
if (frame == null) {
break;
}
- if (frame.readableBytes() != 1000) {
+ if (frame.readableBytes() != MSG_LEN) {
System.out.println("ARGH 1: " + frame.readableBytes());
}
+ for (int j = 0; j < MSG_LEN; j ++) {
+ if (frame.readByte() != (byte) j) {
+ System.out.println("ARGH 3");
+ }
+ }
cnt ++;
}
}