Author: trustin
Date: 2009-11-20 09:43:39 -0500 (Fri, 20 Nov 2009)
New Revision: 8340
Modified:
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
Log:
HornetQFrameDecoder2 always generates a dynamic buffer now.
Modified:
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2009-11-20
14:38:08 UTC (rev 8339)
+++
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2009-11-20
14:43:39 UTC (rev 8340)
@@ -110,9 +110,9 @@
// because previousData will be reassigned to
// EMPTY_BUFFER or 'in' later.
previousData.writeBytes(in, length + 4 - previousData.readableBytes());
- frame = previousData.slice();
+ frame = previousData;
} else {
- frame = ChannelBuffers.buffer(length + 4);
+ frame = ChannelBuffers.dynamicBuffer(length + 4);
frame.writeBytes(previousData, previousData.readerIndex(),
previousData.readableBytes());
frame.writeBytes(in, length + 4 - frame.writerIndex());
}
@@ -155,8 +155,9 @@
break;
}
- ChannelBuffer frame = in.slice(in.readerIndex(), length + SIZE_INT);
- in.skipBytes(SIZE_INT + length);
+ // Convert to dynamic buffer (this requires copy)
+ ChannelBuffer frame = ChannelBuffers.dynamicBuffer(length + SIZE_INT);
+ frame.writeBytes(in, length + SIZE_INT);
frame.skipBytes(SIZE_INT);
Channels.fireMessageReceived(ctx, frame);
}
@@ -174,81 +175,11 @@
else
{
ChannelBuffer newPreviousData =
- ChannelBuffers.dynamicBuffer(
- Math.max(previousData.readableBytes() + in.readableBytes(), length +
4));
+ ChannelBuffers.dynamicBuffer(
+ Math.max(previousData.readableBytes() + in.readableBytes(), length +
4));
newPreviousData.writeBytes(previousData);
newPreviousData.writeBytes(in);
previousData = newPreviousData;
}
}
-
- public static void main(String[] args) throws Exception {
- final boolean useNextGeneration = true;
- final ChannelUpstreamHandler handler;
-
- if (useNextGeneration) {
- handler = new HornetQFrameDecoder2();
- } else {
- handler = new HornetQFrameDecoder(new AbstractBufferHandler()
- {
- public void bufferReceived(Object connectionID, HornetQBuffer buffer)
- { // noop
- }
- });
- }
-
- final int MSG_LEN = 40;
-
- final DecoderEmbedder<ChannelBuffer> decoder =
- new DecoderEmbedder<ChannelBuffer>(handler);
-
- ChannelBuffer src = ChannelBuffers.buffer(30000 * (MSG_LEN + 4));
- while (src.writerIndex() < src.capacity()) {
- src.writeInt(MSG_LEN);
- for (int i = 0; i < MSG_LEN; i ++) {
- src.writeByte((byte) i);
- }
- //src.writeZero(1000);
- }
-
- Random rand = new Random();
- List<ChannelBuffer> packets = new ArrayList<ChannelBuffer>();
- for (int i = 0; i < src.capacity();) {
- int length = Math.min(rand.nextInt(10000), src.capacity() - i);
- packets.add(src.copy(i, length));
- i += length;
- }
-
- long startTime = System.nanoTime();
-
- for (int i = 0; i < 10; i ++) {
- int cnt = 0;
- for (ChannelBuffer p: packets) {
- decoder.offer(p.duplicate());
- for (;;) {
- ChannelBuffer frame = decoder.poll();
- if (frame == null) {
- break;
- }
- if (frame.readableBytes() != MSG_LEN) {
- System.out.println("ARGH 1: " + frame.readableBytes());
- }
- for (int j = 0; j < MSG_LEN; j ++) {
- if (frame.readByte() != (byte) j) {
- System.out.println("ARGH 3");
- }
- }
- cnt ++;
- }
- }
- if (cnt != 30000) {
- System.out.println("ARGH 2: " + cnt);
- }
- }
-
- long endTime = System.nanoTime();
- System.out.println(
- handler.getClass().getSimpleName() + ": " +
- (endTime - startTime) / 1000000);
- }
}