Zero Copy
Sergey S
sergey.saleev at gmail.com
Mon May 17 10:18:09 EDT 2010
Hi
FYI, there is my optimized version of LengthFieldBasedFrameDecoder:
import org.jboss.netty.channel.*;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
import java.net.SocketAddress;
public class LengthFieldBasedFrameDecoder implements ChannelUpstreamHandler
{
private final int maxFrameLength;
private final int lengthFieldOffset;
private final int lengthFieldLength;
private volatile int lengthBytesToRead;
private volatile ChannelBuffer lengthBuffer;
private volatile long frameBytesToRead;
private volatile ChannelBuffer frameBuffer;
private volatile boolean skipFrame;
public LengthFieldBasedFrameDecoder(int maxFrameLength, int
lengthFieldOffset, int lengthFieldLength)
{
if (maxFrameLength < 0)
throw new IllegalArgumentException("negative maxFrameLength: " +
maxFrameLength);
if (lengthFieldOffset < 0)
throw new IllegalArgumentException("negative lengthFieldOffset:
" + lengthFieldOffset);
if (lengthFieldLength < 1 || (lengthFieldLength > 4 &&
lengthFieldLength != 8))
throw new IllegalArgumentException("lengthFieldLength not in set
1..4, 8: " + lengthFieldLength);
this.maxFrameLength = maxFrameLength;
this.lengthFieldOffset = lengthFieldOffset;
this.lengthFieldLength = lengthFieldLength;
}
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent
event) throws Exception
{
if (event instanceof MessageEvent)
{
MessageEvent msgEvent = (MessageEvent) event;
Object msg = msgEvent.getMessage();
if (msg instanceof ChannelBuffer)
{
callDecode(ctx, (ChannelBuffer) msg,
msgEvent.getRemoteAddress());
return;
}
}
else if (event instanceof ChannelStateEvent)
{
ChannelStateEvent stateEvent = (ChannelStateEvent) event;
if (stateEvent.getState() == ChannelState.CONNECTED)
{
if (stateEvent.getValue() != null)
{
lengthBytesToRead = lengthFieldOffset +
lengthFieldLength;
lengthBuffer =
getBuffer(ctx.getChannel().getConfig().getBufferFactory(),
lengthBytesToRead);
}
}
}
ctx.sendUpstream(event);
}
private void callDecode(ChannelHandlerContext ctx, ChannelBuffer buffer,
SocketAddress remoteAddress)
throws Exception
{
while (buffer.readableBytes() > 0)
{
Object o = decode(ctx, buffer);
if (o != null) Channels.fireMessageReceived(ctx, o,
remoteAddress);
}
}
protected Object decode(ChannelHandlerContext ctx, ChannelBuffer buffer)
throws Exception
{
if (lengthBytesToRead > 0)
{
if (lengthBytesToRead > buffer.readableBytes())
{
lengthBytesToRead -= buffer.readableBytes();
lengthBuffer.writeBytes(buffer);
return null;
}
else
{
lengthBuffer.writeBytes(buffer, lengthBytesToRead);
lengthBytesToRead = 0;
switch (lengthFieldLength)
{
case 1: frameBytesToRead =
lengthBuffer.getUnsignedByte(lengthFieldOffset); break;
case 2: frameBytesToRead =
lengthBuffer.getUnsignedShort(lengthFieldOffset); break;
case 3: frameBytesToRead =
lengthBuffer.getUnsignedMedium(lengthFieldOffset); break;
case 4: frameBytesToRead =
lengthBuffer.getUnsignedInt(lengthFieldOffset); break;
case 8: frameBytesToRead =
lengthBuffer.getLong(lengthFieldOffset); break;
default: throw new Error("incorrect lengthFieldLength");
}
if (frameBytesToRead < 0)
{
skipFrame = true;
frameBytesToRead = 0;
Channels.fireExceptionCaught(ctx, new
CorruptedFrameException("negative frame length: " + frameBytesToRead));
}
else if (frameBytesToRead > maxFrameLength)
{
skipFrame = true;
Channels.fireExceptionCaught(ctx, new
TooLongFrameException("frame length exceeds " + maxFrameLength + ": " +
frameBytesToRead));
}
else
{
skipFrame = false;
frameBuffer =
getBuffer(ctx.getChannel().getConfig().getBufferFactory(), (int)
frameBytesToRead);
}
}
}
if (frameBytesToRead > buffer.readableBytes())
{
frameBytesToRead -= buffer.readableBytes();
if (skipFrame)
{
buffer.skipBytes(buffer.readableBytes());
}
else
{
frameBuffer.writeBytes(buffer);
}
return null;
}
else
{
lengthBuffer.setIndex(0, 0);
lengthBytesToRead = lengthFieldOffset + lengthFieldLength;
if (skipFrame)
{
buffer.skipBytes((int) frameBytesToRead);
frameBytesToRead = 0;
return null;
}
else
{
frameBuffer.writeBytes(buffer, (int) frameBytesToRead);
frameBytesToRead = 0;
final ChannelBuffer result = frameBuffer;
frameBuffer = null;
return result;
}
}
}
protected ChannelBuffer getBuffer(ChannelBufferFactory factory, int
capacity)
{
return factory.getBuffer(capacity);
}
}
--
View this message in context: http://netty-forums-and-mailing-lists.685743.n2.nabble.com/Zero-Copy-tp4951824p5065308.html
Sent from the Netty User Group mailing list archive at Nabble.com.
More information about the netty-users
mailing list