Extent based channel buffer with predictable capacity extension.
"이희승 (Trustin Lee)"
trustin at gmail.com
Tue Oct 25 20:25:40 EDT 2011
Thank you Kevin. I've just left a comment.
Kevin Burton wrote:
> https://issues.jboss.org/browse/NETTY-446
>
> created.... should be an easy change.
>
> On Thu, Oct 20, 2011 at 4:38 PM, Trustin Lee <trustin at gmail.com
> <mailto:trustin at gmail.com>> wrote:
>
> I like the idea of limiting the delta between the old and new
> capacity, and such parameter should be part of DynamicBuffer.
> Would you mind if I ask you to file a JIRA issue for it?
>
> Thanks,
> Trustin
>
> --
> Trustin Lee
> Sent with Sparrow <http://www.sparrowmailapp.com/?sig>
>
> On Wednesday, October 19, 2011 at 12:04 PM, Kevin Burton wrote:
>
>> ... more on this.
>>
>> 1. ignore the FIXME in ensureWritableBytes ... or just delete
>> that line if you compile it.
>>
>> 2. this is about 2x slower than a direct heap buffer. The
>> overhead comes from the composite buffer.
>>
>> On Wed, Oct 19, 2011 at 11:31 AM, Kevin Burton
>> <burtonator at gmail.com <mailto:burtonator at gmail.com>> wrote:
>>>
>>> Hey gang.
>>>
>>> I've been working on a map reduce implementation (which I'm
>>> about to OSS) and I've started to use ChannelBuffers extensively
>>> in the framework.
>>>
>>> Anyway... some of the buffers I'm dealing with are hundreds of
>>> MBs... I need extent based extension on some of them because
>>> going from 128MB to 256MB very quick isn't fun when I'm trying
>>> to conserve memory.
>>>
>>> This makes a SMALL change to DynamicChannelBuffer.
>>>
>>> Basically, I give it an initial capacity of say 128MB and then
>>> give it extents of 2MB . When it's out of memory it will
>>> increase capacity by 2MB. ... so worse case scenario I waste
>>> 2MB of memory.
>>>
>>> I could re-implement this VERY easily with say 30 lines of code
>>> if we could change 'buffer' in DynamicChannelBuffer to
>>> 'protected' ....
>>>
>>> If you would accept this change I'll just submit a new patch
>>> changing this to protected and then the new
>>> ExtendedChannelBuffer ...
>>>
>>> Thoughts?
>>>
>>>
>>>
>>> package peregrine.util;
>>>
>>> import java.io <http://java.io>.*;
>>> import java.util.*;
>>>
>>> import org.jboss.netty.buffer.*;
>>>
>>> import java.io.IOException;
>>> import java.io.InputStream;
>>> import java.io.OutputStream;
>>> import java.util.ArrayList;
>>> import java.util.List;
>>> import java.nio.ByteBuffer;
>>> import java.nio.ByteOrder;
>>> import java.nio.channels.GatheringByteChannel;
>>> import java.nio.channels.ScatteringByteChannel;
>>>
>>> /**
>>> * A dynamic capacity buffer which increases its capacity as
>>> needed. It is
>>> * recommended to use {@link ChannelBuffers#dynamicBuffer(int)}
>>> instead of
>>> * calling the constructor explicitly.
>>> *
>>> * @author <a href="http://www.jboss.org/netty/">The Netty
>>> Project</a>
>>> * @author <a href="http://gleamynode.net/">Trustin Lee</a>
>>> *
>>> * @version $Rev: 2206M $, $Date: 2010-11-09 15:04:10 +0900
>>> (Tue, 09 Nov 2010) $
>>> *
>>> */
>>> public class ExtendedChannelBuffer extends AbstractChannelBuffer {
>>>
>>> private final ChannelBufferFactory factory;
>>> private final ByteOrder endianness;
>>> private ChannelBuffer buffer;
>>>
>>> private List<ChannelBuffer> extents = new ArrayList();
>>>
>>> private int extentLength;
>>>
>>> private int initialCapacity;
>>> public ExtendedChannelBuffer(int initialCapacity, int
>>> extentLength) {
>>> this(ByteOrder.BIG_ENDIAN, initialCapacity, extentLength);
>>> }
>>>
>>> public ExtendedChannelBuffer(ByteOrder endianness, int
>>> initialCapacity, int extentLength) {
>>> this(endianness, initialCapacity, extentLength,
>>> HeapChannelBufferFactory.getInstance(endianness));
>>> }
>>>
>>> public ExtendedChannelBuffer(ByteOrder endianness, int
>>> initialCapacity, int extentLength, ChannelBufferFactory factory) {
>>> if (extentLength < 0) {
>>> throw new IllegalArgumentException("extentLength: "
>>> + extentLength);
>>> }
>>> if (endianness == null) {
>>> throw new NullPointerException("endianness");
>>> }
>>> if (factory == null) {
>>> throw new NullPointerException("factory");
>>> }
>>> this.factory = factory;
>>> this.endianness = endianness;
>>> this.extentLength = extentLength;
>>>
>>> extend( initialCapacity );
>>> }
>>>
>>> @Override
>>> public void ensureWritableBytes(int minWritableBytes) {
>>>
>>> if (minWritableBytes <= writableBytes()) {
>>> return;
>>> }
>>>
>>> extend( extentLength );
>>>
>>> }
>>>
>>> private void extend( int newExtentLength ) {
>>>
>>> ChannelBuffer newExtent = factory().getBuffer(order(),
>>> newExtentLength);
>>> extents.add( newExtent );
>>>
>>> System.out.printf( "FIXME: allocating extent: %s\n",
>>> newExtentLength );
>>> buffer = new CompositeChannelBuffer( endianness, extents );
>>> }
>>> public ChannelBufferFactory factory() {
>>> return factory;
>>> }
>>>
>>> public ByteOrder order() {
>>> return endianness;
>>> }
>>>
>>> public boolean isDirect() {
>>> return buffer.isDirect();
>>> }
>>>
>>> public int capacity() {
>>> return buffer.capacity();
>>> }
>>>
>>> public boolean hasArray() {
>>> return buffer.hasArray();
>>> }
>>>
>>> public byte[] array() {
>>> return buffer.array();
>>> }
>>>
>>> public int arrayOffset() {
>>> return buffer.arrayOffset();
>>> }
>>>
>>> public byte getByte(int index) {
>>> return buffer.getByte(index);
>>> }
>>>
>>> public short getShort(int index) {
>>> return buffer.getShort(index);
>>> }
>>>
>>> public int getUnsignedMedium(int index) {
>>> return buffer.getUnsignedMedium(index);
>>> }
>>>
>>> public int getInt(int index) {
>>> return buffer.getInt(index);
>>> }
>>>
>>> public long getLong(int index) {
>>> return buffer.getLong(index);
>>> }
>>>
>>> public void getBytes(int index, byte[] dst, int dstIndex,
>>> int length) {
>>> buffer.getBytes(index, dst, dstIndex, length);
>>> }
>>>
>>> public void getBytes(int index, ChannelBuffer dst, int
>>> dstIndex, int length) {
>>> buffer.getBytes(index, dst, dstIndex, length);
>>> }
>>>
>>> public void getBytes(int index, ByteBuffer dst) {
>>> buffer.getBytes(index, dst);
>>> }
>>>
>>> public int getBytes(int index, GatheringByteChannel out, int
>>> length)
>>> throws IOException {
>>> return buffer.getBytes(index, out, length);
>>> }
>>>
>>> public void getBytes(int index, OutputStream out, int length)
>>> throws IOException {
>>> buffer.getBytes(index, out, length);
>>> }
>>>
>>> public void setByte(int index, int value) {
>>> buffer.setByte(index, value);
>>> }
>>>
>>> public void setShort(int index, int value) {
>>> buffer.setShort(index, value);
>>> }
>>>
>>> public void setMedium(int index, int value) {
>>> buffer.setMedium(index, value);
>>> }
>>>
>>> public void setInt(int index, int value) {
>>> buffer.setInt(index, value);
>>> }
>>>
>>> public void setLong(int index, long value) {
>>> buffer.setLong(index, value);
>>> }
>>>
>>> public void setBytes(int index, byte[] src, int srcIndex,
>>> int length) {
>>> buffer.setBytes(index, src, srcIndex, length);
>>> }
>>>
>>> public void setBytes(int index, ChannelBuffer src, int
>>> srcIndex, int length) {
>>> buffer.setBytes(index, src, srcIndex, length);
>>> }
>>>
>>> public void setBytes(int index, ByteBuffer src) {
>>> buffer.setBytes(index, src);
>>> }
>>>
>>> public int setBytes(int index, InputStream in, int length)
>>> throws IOException {
>>> return buffer.setBytes(index, in, length);
>>> }
>>>
>>> public int setBytes(int index, ScatteringByteChannel in, int
>>> length)
>>> throws IOException {
>>> return buffer.setBytes(index, in, length);
>>> }
>>>
>>> @Override
>>> public void writeByte(int value) {
>>> ensureWritableBytes(1);
>>> super.writeByte(value);
>>> }
>>>
>>> @Override
>>> public void writeShort(int value) {
>>> ensureWritableBytes(2);
>>> super.writeShort(value);
>>> }
>>>
>>> @Override
>>> public void writeMedium(int value) {
>>> ensureWritableBytes(3);
>>> super.writeMedium(value);
>>> }
>>>
>>> @Override
>>> public void writeInt(int value) {
>>> ensureWritableBytes(4);
>>> super.writeInt(value);
>>> }
>>>
>>> @Override
>>> public void writeLong(long value) {
>>> ensureWritableBytes(8);
>>> super.writeLong(value);
>>> }
>>>
>>> @Override
>>> public void writeBytes(byte[] src, int srcIndex, int length) {
>>> ensureWritableBytes(length);
>>> super.writeBytes(src, srcIndex, length);
>>> }
>>>
>>> @Override
>>> public void writeBytes(ChannelBuffer src, int srcIndex, int
>>> length) {
>>> ensureWritableBytes(length);
>>> super.writeBytes(src, srcIndex, length);
>>> }
>>>
>>> @Override
>>> public void writeBytes(ByteBuffer src) {
>>> ensureWritableBytes(src.remaining());
>>> super.writeBytes(src);
>>> }
>>>
>>> @Override
>>> public int writeBytes(InputStream in, int length) throws
>>> IOException {
>>> ensureWritableBytes(length);
>>> return super.writeBytes(in, length);
>>> }
>>>
>>> @Override
>>> public int writeBytes(ScatteringByteChannel in, int length)
>>> throws IOException {
>>> ensureWritableBytes(length);
>>> return super.writeBytes(in, length);
>>> }
>>>
>>> @Override
>>> public void writeZero(int length) {
>>> ensureWritableBytes(length);
>>> super.writeZero(length);
>>> }
>>>
>>> public ChannelBuffer duplicate() {
>>> return new DuplicatedChannelBuffer(this);
>>> }
>>>
>>> public ChannelBuffer copy(int index, int length) {
>>> ExtendedChannelBuffer copiedBuffer = new
>>> ExtendedChannelBuffer(order(), Math.max(length, 64),
>>> extentLength, factory());
>>> copiedBuffer.buffer = buffer.copy(index, length);
>>> copiedBuffer.setIndex(0, length);
>>> return copiedBuffer;
>>> }
>>>
>>> public ChannelBuffer slice(int index, int length) {
>>> if (index == 0) {
>>> if (length == 0) {
>>> return ChannelBuffers.EMPTY_BUFFER;
>>> }
>>> return new TruncatedChannelBuffer(this, length);
>>> } else {
>>> if (length == 0) {
>>> return ChannelBuffers.EMPTY_BUFFER;
>>> }
>>> return new SlicedChannelBuffer(this, index, length);
>>> }
>>> }
>>>
>>> public ByteBuffer toByteBuffer(int index, int length) {
>>> return buffer.toByteBuffer(index, length);
>>> }
>>> }
>>>
>>
>> _______________________________________________
>> netty-users mailing list
>> netty-users at lists.jboss.org <mailto:netty-users at lists.jboss.org>
>> https://lists.jboss.org/mailman/listinfo/netty-users
>
>
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org <mailto:netty-users at lists.jboss.org>
> https://lists.jboss.org/mailman/listinfo/netty-users
>
>
>
>
> --
>
> Founder/CEO Spinn3r.com <http://Spinn3r.com>
>
> Location: *San Francisco, CA*
> Skype: *burtonator*
>
> Skype-in: *(415) 871-0687*
>
>
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/netty-users/attachments/20111025/c444a505/attachment-0001.html
More information about the netty-users
mailing list