Extent based channel buffer with predictable capacity extension.

"이희승 (Trustin Lee)" trustin at gmail.com
Tue Oct 25 20:25:40 EDT 2011


Thank you Kevin. I've just left a comment.

Kevin Burton wrote:
 > https://issues.jboss.org/browse/NETTY-446
 >
 > created.... should be an easy change.
 >
 > On Thu, Oct 20, 2011 at 4:38 PM, Trustin Lee <trustin at gmail.com
 > <mailto:trustin at gmail.com>> wrote:
 >
 > I like the idea of limiting the delta between the old and new
 > capacity, and such parameter should be part of DynamicBuffer.
 > Would you mind if I ask you to file a JIRA issue for it?
 >
 > Thanks,
 > Trustin
 >
 > --
 > Trustin Lee
 > Sent with Sparrow <http://www.sparrowmailapp.com/?sig>
 >
 > On Wednesday, October 19, 2011 at 12:04 PM, Kevin Burton wrote:
 >
 >> ... more on this.
 >>
 >> 1. ignore the FIXME in ensureWritableBytes ... or just delete
 >> that line if you compile it.
 >>
 >> 2. this is about 2x slower than a direct heap buffer. The
 >> overhead comes from the composite buffer.
 >>
 >> On Wed, Oct 19, 2011 at 11:31 AM, Kevin Burton
 >> <burtonator at gmail.com <mailto:burtonator at gmail.com>> wrote:
 >>>
 >>> Hey gang.
 >>>
 >>> I've been working on a map reduce implementation (which I'm
 >>> about to OSS) and I've started to use ChannelBuffers extensively
 >>> in the framework.
 >>>
 >>> Anyway... some of the buffers I'm dealing with are hundreds of
 >>> MBs... I need extent based extension on some of them because
 >>> going from 128MB to 256MB very quick isn't fun when I'm trying
 >>> to conserve memory.
 >>>
 >>> This makes a SMALL change to DynamicChannelBuffer.
 >>>
 >>> Basically, I give it an initial capacity of say 128MB and then
 >>> give it extents of 2MB . When it's out of memory it will
 >>> increase capacity by 2MB. ... so worse case scenario I waste
 >>> 2MB of memory.
 >>>
 >>> I could re-implement this VERY easily with say 30 lines of code
 >>> if we could change 'buffer' in DynamicChannelBuffer to
 >>> 'protected' ....
 >>>
 >>> If you would accept this change I'll just submit a new patch
 >>> changing this to protected and then the new
 >>> ExtendedChannelBuffer ...
 >>>
 >>> Thoughts?
 >>>
 >>>
 >>>
 >>> package peregrine.util;
 >>>
 >>> import java.io <http://java.io>.*;
 >>> import java.util.*;
 >>>
 >>> import org.jboss.netty.buffer.*;
 >>>
 >>> import java.io.IOException;
 >>> import java.io.InputStream;
 >>> import java.io.OutputStream;
 >>> import java.util.ArrayList;
 >>> import java.util.List;
 >>> import java.nio.ByteBuffer;
 >>> import java.nio.ByteOrder;
 >>> import java.nio.channels.GatheringByteChannel;
 >>> import java.nio.channels.ScatteringByteChannel;
 >>>
 >>> /**
 >>> * A dynamic capacity buffer which increases its capacity as
 >>> needed. It is
 >>> * recommended to use {@link ChannelBuffers#dynamicBuffer(int)}
 >>> instead of
 >>> * calling the constructor explicitly.
 >>> *
 >>> * @author <a href="http://www.jboss.org/netty/">The Netty
 >>> Project</a>
 >>> * @author <a href="http://gleamynode.net/">Trustin Lee</a>
 >>> *
 >>> * @version $Rev: 2206M $, $Date: 2010-11-09 15:04:10 +0900
 >>> (Tue, 09 Nov 2010) $
 >>> *
 >>> */
 >>> public class ExtendedChannelBuffer extends AbstractChannelBuffer {
 >>>
 >>> private final ChannelBufferFactory factory;
 >>> private final ByteOrder endianness;
 >>> private ChannelBuffer buffer;
 >>>
 >>> private List<ChannelBuffer> extents = new ArrayList();
 >>>
 >>> private int extentLength;
 >>>
 >>> private int initialCapacity;
 >>> public ExtendedChannelBuffer(int initialCapacity, int
 >>> extentLength) {
 >>> this(ByteOrder.BIG_ENDIAN, initialCapacity, extentLength);
 >>> }
 >>>
 >>> public ExtendedChannelBuffer(ByteOrder endianness, int
 >>> initialCapacity, int extentLength) {
 >>> this(endianness, initialCapacity, extentLength,
 >>> HeapChannelBufferFactory.getInstance(endianness));
 >>> }
 >>>
 >>> public ExtendedChannelBuffer(ByteOrder endianness, int
 >>> initialCapacity, int extentLength, ChannelBufferFactory factory) {
 >>> if (extentLength < 0) {
 >>> throw new IllegalArgumentException("extentLength: "
 >>> + extentLength);
 >>> }
 >>> if (endianness == null) {
 >>> throw new NullPointerException("endianness");
 >>> }
 >>> if (factory == null) {
 >>> throw new NullPointerException("factory");
 >>> }
 >>> this.factory = factory;
 >>> this.endianness = endianness;
 >>> this.extentLength = extentLength;
 >>>
 >>> extend( initialCapacity );
 >>> }
 >>>
 >>> @Override
 >>> public void ensureWritableBytes(int minWritableBytes) {
 >>>
 >>> if (minWritableBytes <= writableBytes()) {
 >>> return;
 >>> }
 >>>
 >>> extend( extentLength );
 >>>
 >>> }
 >>>
 >>> private void extend( int newExtentLength ) {
 >>>
 >>> ChannelBuffer newExtent = factory().getBuffer(order(),
 >>> newExtentLength);
 >>> extents.add( newExtent );
 >>>
 >>> System.out.printf( "FIXME: allocating extent: %s\n",
 >>> newExtentLength );
 >>> buffer = new CompositeChannelBuffer( endianness, extents );
 >>> }
 >>> public ChannelBufferFactory factory() {
 >>> return factory;
 >>> }
 >>>
 >>> public ByteOrder order() {
 >>> return endianness;
 >>> }
 >>>
 >>> public boolean isDirect() {
 >>> return buffer.isDirect();
 >>> }
 >>>
 >>> public int capacity() {
 >>> return buffer.capacity();
 >>> }
 >>>
 >>> public boolean hasArray() {
 >>> return buffer.hasArray();
 >>> }
 >>>
 >>> public byte[] array() {
 >>> return buffer.array();
 >>> }
 >>>
 >>> public int arrayOffset() {
 >>> return buffer.arrayOffset();
 >>> }
 >>>
 >>> public byte getByte(int index) {
 >>> return buffer.getByte(index);
 >>> }
 >>>
 >>> public short getShort(int index) {
 >>> return buffer.getShort(index);
 >>> }
 >>>
 >>> public int getUnsignedMedium(int index) {
 >>> return buffer.getUnsignedMedium(index);
 >>> }
 >>>
 >>> public int getInt(int index) {
 >>> return buffer.getInt(index);
 >>> }
 >>>
 >>> public long getLong(int index) {
 >>> return buffer.getLong(index);
 >>> }
 >>>
 >>> public void getBytes(int index, byte[] dst, int dstIndex,
 >>> int length) {
 >>> buffer.getBytes(index, dst, dstIndex, length);
 >>> }
 >>>
 >>> public void getBytes(int index, ChannelBuffer dst, int
 >>> dstIndex, int length) {
 >>> buffer.getBytes(index, dst, dstIndex, length);
 >>> }
 >>>
 >>> public void getBytes(int index, ByteBuffer dst) {
 >>> buffer.getBytes(index, dst);
 >>> }
 >>>
 >>> public int getBytes(int index, GatheringByteChannel out, int
 >>> length)
 >>> throws IOException {
 >>> return buffer.getBytes(index, out, length);
 >>> }
 >>>
 >>> public void getBytes(int index, OutputStream out, int length)
 >>> throws IOException {
 >>> buffer.getBytes(index, out, length);
 >>> }
 >>>
 >>> public void setByte(int index, int value) {
 >>> buffer.setByte(index, value);
 >>> }
 >>>
 >>> public void setShort(int index, int value) {
 >>> buffer.setShort(index, value);
 >>> }
 >>>
 >>> public void setMedium(int index, int value) {
 >>> buffer.setMedium(index, value);
 >>> }
 >>>
 >>> public void setInt(int index, int value) {
 >>> buffer.setInt(index, value);
 >>> }
 >>>
 >>> public void setLong(int index, long value) {
 >>> buffer.setLong(index, value);
 >>> }
 >>>
 >>> public void setBytes(int index, byte[] src, int srcIndex,
 >>> int length) {
 >>> buffer.setBytes(index, src, srcIndex, length);
 >>> }
 >>>
 >>> public void setBytes(int index, ChannelBuffer src, int
 >>> srcIndex, int length) {
 >>> buffer.setBytes(index, src, srcIndex, length);
 >>> }
 >>>
 >>> public void setBytes(int index, ByteBuffer src) {
 >>> buffer.setBytes(index, src);
 >>> }
 >>>
 >>> public int setBytes(int index, InputStream in, int length)
 >>> throws IOException {
 >>> return buffer.setBytes(index, in, length);
 >>> }
 >>>
 >>> public int setBytes(int index, ScatteringByteChannel in, int
 >>> length)
 >>> throws IOException {
 >>> return buffer.setBytes(index, in, length);
 >>> }
 >>>
 >>> @Override
 >>> public void writeByte(int value) {
 >>> ensureWritableBytes(1);
 >>> super.writeByte(value);
 >>> }
 >>>
 >>> @Override
 >>> public void writeShort(int value) {
 >>> ensureWritableBytes(2);
 >>> super.writeShort(value);
 >>> }
 >>>
 >>> @Override
 >>> public void writeMedium(int value) {
 >>> ensureWritableBytes(3);
 >>> super.writeMedium(value);
 >>> }
 >>>
 >>> @Override
 >>> public void writeInt(int value) {
 >>> ensureWritableBytes(4);
 >>> super.writeInt(value);
 >>> }
 >>>
 >>> @Override
 >>> public void writeLong(long value) {
 >>> ensureWritableBytes(8);
 >>> super.writeLong(value);
 >>> }
 >>>
 >>> @Override
 >>> public void writeBytes(byte[] src, int srcIndex, int length) {
 >>> ensureWritableBytes(length);
 >>> super.writeBytes(src, srcIndex, length);
 >>> }
 >>>
 >>> @Override
 >>> public void writeBytes(ChannelBuffer src, int srcIndex, int
 >>> length) {
 >>> ensureWritableBytes(length);
 >>> super.writeBytes(src, srcIndex, length);
 >>> }
 >>>
 >>> @Override
 >>> public void writeBytes(ByteBuffer src) {
 >>> ensureWritableBytes(src.remaining());
 >>> super.writeBytes(src);
 >>> }
 >>>
 >>> @Override
 >>> public int writeBytes(InputStream in, int length) throws
 >>> IOException {
 >>> ensureWritableBytes(length);
 >>> return super.writeBytes(in, length);
 >>> }
 >>>
 >>> @Override
 >>> public int writeBytes(ScatteringByteChannel in, int length)
 >>> throws IOException {
 >>> ensureWritableBytes(length);
 >>> return super.writeBytes(in, length);
 >>> }
 >>>
 >>> @Override
 >>> public void writeZero(int length) {
 >>> ensureWritableBytes(length);
 >>> super.writeZero(length);
 >>> }
 >>>
 >>> public ChannelBuffer duplicate() {
 >>> return new DuplicatedChannelBuffer(this);
 >>> }
 >>>
 >>> public ChannelBuffer copy(int index, int length) {
 >>> ExtendedChannelBuffer copiedBuffer = new
 >>> ExtendedChannelBuffer(order(), Math.max(length, 64),
 >>> extentLength, factory());
 >>> copiedBuffer.buffer = buffer.copy(index, length);
 >>> copiedBuffer.setIndex(0, length);
 >>> return copiedBuffer;
 >>> }
 >>>
 >>> public ChannelBuffer slice(int index, int length) {
 >>> if (index == 0) {
 >>> if (length == 0) {
 >>> return ChannelBuffers.EMPTY_BUFFER;
 >>> }
 >>> return new TruncatedChannelBuffer(this, length);
 >>> } else {
 >>> if (length == 0) {
 >>> return ChannelBuffers.EMPTY_BUFFER;
 >>> }
 >>> return new SlicedChannelBuffer(this, index, length);
 >>> }
 >>> }
 >>>
 >>> public ByteBuffer toByteBuffer(int index, int length) {
 >>> return buffer.toByteBuffer(index, length);
 >>> }
 >>> }
 >>>
 >>
 >> _______________________________________________
 >> netty-users mailing list
 >> netty-users at lists.jboss.org <mailto:netty-users at lists.jboss.org>
 >> https://lists.jboss.org/mailman/listinfo/netty-users
 >
 >
 > _______________________________________________
 > netty-users mailing list
 > netty-users at lists.jboss.org <mailto:netty-users at lists.jboss.org>
 > https://lists.jboss.org/mailman/listinfo/netty-users
 >
 >
 >
 >
 > --
 >
 > Founder/CEO Spinn3r.com <http://Spinn3r.com>
 >
 > Location: *San Francisco, CA*
 > Skype: *burtonator*
 >
 > Skype-in: *(415) 871-0687*
 >
 >
 > _______________________________________________
 > netty-users mailing list
 > netty-users at lists.jboss.org
 > https://lists.jboss.org/mailman/listinfo/netty-users
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/netty-users/attachments/20111025/c444a505/attachment-0001.html 


More information about the netty-users mailing list