Process different types of messages

Michael McGrady mmcgrady at topiatechnology.com
Mon Sep 14 14:57:51 EDT 2009


Hi, Bertrand,

KarmaFileChunk is not similar to Netty's ChunkedFile.  I probably  
should have named it KarmaChunk and may change it to that.

Rather KarmaFileChunk is similar to the use of a ChannelBuffer in  
Netty's ChunkedFile and ChunkedNioStream.  In Netty the data is held  
in the ChannelBuffer and is not instrumented.  As you can see, the  
KarmaFileChunk is highly instrumented.  The reason for this is that we  
want to be able to use the KarmaFileChunk for situations where  
delivery is independent of the original sender's connections with  
receivers.  So, as a result, we have to keep track of what has been  
transferred prior to a connection being broken.

We do have plans to open source the project of which KarmaFileChunk is  
a part.  Those plans have not been finalized and are in concert with  
IEEE standards.

I hope this helps you.  If you look at the KarmaDecoder, the rest of  
the application can easily be inferred.  Essentially the sender uses  
ChannelFutures in conjunction with Object monitors to make sure that  
certain parts of multiple objects sent are sent in a particular  
order.  Thus, for KarmaDecoder, we make sure that a KarmaManager with  
the same UUID as KarmaFileChunks precedes the sending of the chunks,  
and this can include any number of files as well as any number of file  
chunks.

I prefer streaming of files to the random access approach.

Mike


On Sep 14, 2009, at 11:35 AM, Bertrand Goetzmann wrote:

> Hello Michael,
>
> Is KarmaFileChunk similar to Netty's ChunkedFile?
> Is it Karma is an open source project based on Netty? Where can I  
> find description, example, and so on, on Karma?
>
> Thank you for your help.
>
> Bertrand.
>
> 2009/9/14 Michael McGrady <mmcgrady at topiatechnology.com>
> I was going to say that this is what KarmaDecoder does.  Here is the  
> basic data object used in KarmaDecoder.
>
> /**
>  * @author Mike McGrady
>  * @version Karma Alpha v001
>  * @param <KarmaFileChunk>
>  * @since August 2009.
>  */
> public final class KarmaFileChunk implements Serializable,
> 		Comparable<KarmaFileChunk> {
> 	/**
> 	 *
> 	 */
> 	private static final long serialVersionUID = -8062599394269438322L;
> 	/**
> 	 * The last chunk of a file.
> 	 */
> 	public final boolean isLast;
> 	/**
> 	 * The ID of the associated KarmaManager.
> 	 */
> 	public final UUID uuid;
> 	/**
> 	 * The bytes of the chunk.
> 	 */
> 	public final byte[] bytes;
> 	/**
> 	 * The sequential ID of the chunk beginning with the integer 1.
> 	 */
> 	public final int id;
> 	/**
> 	 * The offset after the chunk is added to the file.
> 	 */
> 	public final long offset;
> 	/**
> 	 * The default size of a chunk.
> 	 */
> 	public final int chunkSize;
> 	/**
> 	 * The length of the file.
> 	 */
> 	public final int fileLength;
> 	/**
> 	 * The filename.
> 	 */
> 	public final String fileName;
>
> 	public final int numberOfChunks;
>
> 	public KarmaFileChunk(UUID uuid, final byte[] bytes, final int id,
> 			final long offset, final int chunkSize, final int fileLength,
> 			boolean isLast, String fileName) {
> 		this.uuid = uuid;
> 		this.bytes = bytes;
> 		this.id = id;
> 		this.offset = offset;
> 		this.chunkSize = chunkSize;
> 		this.fileLength = fileLength;
> 		this.isLast = isLast;
> 		this.fileName = fileName;
> 		if (!this.isLast) {
> 			this.numberOfChunks = (fileLength / chunkSize)
> 					+ (((fileLength % chunkSize) > 0) ? 1 : 0);
> 		} else {
> 			this.numberOfChunks = this.id;
> 		}
> 	}
>
> 	public int compareTo(KarmaFileChunk chunk) {
> 		int result = -1;
> 		if (this == chunk) {
> 			result = -1;
> 		}
> 		if (chunk == null) {
> 			result = -1;
> 		}
> 		if (chunk.hashCode() > this.hashCode()) {
> 			result = +1;
> 		} else if (chunk.hashCode() < this.hashCode()) {
> 			result = -1;
> 		} else if (chunk.equals(this)) {
> 			result = 0;
> 		} else {
> 			// Probably impossible.
> 			result = -1;
> 		}
>
> 		return result;
> 	}
>
> 	@Override
> 	public String toString() {
> 		return this.getClass().getSimpleName() + hashCode()
> 				+ " \n\tfile uuid: " + uuid + " offset=" + offset
> 				+ " chunkSize=" + chunkSize + " bytes=" + bytes
> 				+ " fileLength: " + fileLength + " fileName: " + fileName
> 				+ " + isLast: " + isLast + " numberOfChunks: " + numberOfChunks
> 				+ " chunk sequence id (number)=" + id;
> 	}
>
> 	@Override
> 	public int hashCode() {
> 		final int prime = 31;
> 		int result = 1;
> 		result = prime * result + Arrays.hashCode(this.bytes);
> 		result = prime * result + this.chunkSize;
> 		result = prime * result + this.fileLength;
> 		result = prime * result
> 				+ ((this.fileName == null) ? 0 : this.fileName.hashCode());
> 		result = prime * result + this.id;
> 		result = prime * result + (this.isLast ? 1231 : 1237);
> 		result = prime * result + this.numberOfChunks;
> 		result = prime * result + (int) (this.offset ^ (this.offset >>>  
> 32));
> 		result = prime * result
> 				+ ((this.uuid == null) ? 0 : this.uuid.hashCode());
> 		return result;
> 	}
>
> 	@Override
> 	public boolean equals(Object obj) {
> 		if (this == obj) {
> 			return true;
> 		}
> 		if (obj == null) {
> 			return false;
> 		}
> 		if (!(obj instanceof KarmaFileChunk)) {
> 			return false;
> 		}
> 		KarmaFileChunk other = (KarmaFileChunk) obj;
> 		if (!Arrays.equals(this.bytes, other.bytes)) {
> 			return false;
> 		}
> 		if (this.chunkSize != other.chunkSize) {
> 			return false;
> 		}
> 		if (this.fileLength != other.fileLength) {
> 			return false;
> 		}
> 		if (this.fileName == null) {
> 			if (other.fileName != null) {
> 				return false;
> 			}
> 		} else if (!this.fileName.equals(other.fileName)) {
> 			return false;
> 		}
> 		if (this.id != other.id) {
> 			return false;
> 		}
> 		if (this.isLast != other.isLast) {
> 			return false;
> 		}
> 		if (this.numberOfChunks != other.numberOfChunks) {
> 			return false;
> 		}
> 		if (this.offset != other.offset) {
> 			return false;
> 		}
> 		if (this.uuid == null) {
> 			if (other.uuid != null) {
> 				return false;
> 			}
> 		} else if (!this.uuid.equals(other.uuid)) {
> 			return false;
> 		}
>
> 		return true;
> 	}
> }
>
>
> /**
>  * <p>
>  * Follows in general outline the JBoss Netty  
> <code>ChunkedNioStream</code>.
>  * <code>KarmaChunkedInput</code> extends Netty <code>ChunkedInput</ 
> code> and
>  * adds a close method.
>  * </p>
>  *
>  * @author Mike McGrady
>  * @version Karma Alpha v001
>  * @since August 2009.
>  */
> public class KarmaChunkedNioStream implements KarmaChunkedInput {
>
> 	private static final InternalLogger logger = InternalLoggerFactory
> 			.getInstance(KarmaChunkedNioStream.class.getName());
> 	static final int DEFAULT_CHUNK_SIZE = 8192;
> 	private final int length;
> 	private final UUID uuid;
> 	private final ReadableByteChannel in;
> 	private int id = 1;
> 	private final String fileName;
>
> 	private int chunkSize;
> 	private volatile long offset;
>
> 	/**
> 	 * Associated ByteBuffer
> 	 */
> 	private ByteBuffer byteBuffer = null;
>
> 	/**
> 	 * Creates a new instance that fetches data from the specified  
> channel.
> 	 */
> 	public KarmaChunkedNioStream(int length, ReadableByteChannel in,  
> File file,
> 			UUID uuid) {
> 		this(length, in, DEFAULT_CHUNK_SIZE, file, uuid);
> 	}
>
> 	/**
> 	 * @param length
> 	 * @param in
> 	 * @param chunkSize
> 	 * @param file
> 	 * @param uuid
> 	 *            <code>KarmaManager</code> ID.
> 	 */
> 	public KarmaChunkedNioStream(int length, ReadableByteChannel in,
> 			int chunkSize, File file, UUID uuid) {
> 		this.length = length;
> 		this.uuid = uuid;
> 		if (in == null) {
> 			throw new NullPointerException("in");
> 		}
> 		if (chunkSize <= 0) {
> 			throw new IllegalArgumentException("chunkSize: " + chunkSize
> 					+ " (expected: a positive integer)");
> 		}
> 		this.in = in;
> 		offset = 0;
> 		this.chunkSize = chunkSize;
> 		if (byteBuffer != null) {
> 			if (byteBuffer.capacity() != chunkSize) {
> 				byteBuffer = null;
> 				byteBuffer = ByteBuffer.allocate(chunkSize);
> 			}
> 		} else {
> 			byteBuffer = ByteBuffer.allocate(chunkSize);
> 		}
>
> 		this.fileName = file.getName();
> 	}
>
> 	/**
> 	 * Returns the number of transferred bytes.
> 	 */
> 	public long getTransferredBytes() {
> 		return offset;
> 	}
>
> 	public boolean hasNextChunk() {
> 		if (byteBuffer.position() > 0) {
> 			// A previous read was not over, so there is a next chunk in the
> 			// buffer at least
> 			return true;
> 		}
> 		if (in.isOpen()) {
> 			// Try to read a new part, and keep this part (no rewind)
> 			int b = -1;
> 			try {
> 				b = in.read(byteBuffer);
> 			} catch (IOException e) {
> 				logger.log(InternalLogLevel.ERROR, e.getMessage());
> 				e.printStackTrace();
> 			}
> 			if (b < 0) {
> 				return false;
> 			} else {
> 				offset += b;
> 				return true;
> 			}
> 		}
> 		return false;
> 	}
>
> 	public void close() {
> 		try {
> 			in.close();
> 		} catch (IOException e) {
> 			logger.log(InternalLogLevel.ERROR, e.getMessage());
> 			e.printStackTrace();
> 		}
> 	}
>
> 	public KarmaFileChunk nextChunk() {
> 		if (!hasNextChunk()) {
> 			return null;
> 		}
> 		// buffer cannot be empty from there
> 		int readBytes = byteBuffer.position();
> 		for (;;) {
> 			int localReadBytes = -1;
> 			try {
> 				localReadBytes = in.read(byteBuffer);
> 			} catch (IOException e) {
> 				logger.log(InternalLogLevel.ERROR, e.getMessage());
> 				e.printStackTrace();
> 			}
>
> 			if (localReadBytes < 0) {
> 				break;
> 			}
>
> 			readBytes += localReadBytes;
> 			offset += localReadBytes;
>
> 			if (readBytes == chunkSize) {
> 				break;
> 			}
>
> 		}
>
> 		byteBuffer.flip();
>
> 		int lim = byteBuffer.limit();
>
> 		byte[] chunk = null;
>
> 		if (lim == DEFAULT_CHUNK_SIZE) {
> 			chunk = byteBuffer.array();
> 		} else {
> 			chunk = new byte[lim];
> 			byteBuffer.get(chunk);
> 		}
>
> 		byteBuffer.clear();
> 		KarmaFileChunk karmaChunk = new KarmaFileChunk(this.uuid, chunk, id 
> ++,
> 				this.offset, chunkSize = lim, length,
> 				((offset == length) ? true : false), fileName);
>
> 		return karmaChunk;
> 	}
> }
>
> On Sep 14, 2009, at 9:44 AM, bgoetzmann wrote:
>
>>
>> I just come to try the approach I spoke about, and it works.
>> Netty is really powerful!
>>
>> Cheers,
>>
>> Bertrand.
>>
>>
>> bgoetzmann wrote:
>>>
>>> Hello,
>>>
>>> In my Netty project whose goal is to transfer large file form a  
>>> client to
>>> a server, I used an ChunkedWriteHandler instance and a custom client
>>> handler that uses the ChunkedFile class in order to send a file  
>>> piece by
>>> piece. All is OK!
>>>
>>> The next step for me would be to send first some information on  
>>> the file
>>> that is about to be sent (name, size, etc.). So I need to send a  
>>> POJO
>>> instance.
>>> What is the approach I can take? Can you tell me if this is a valid
>>> approach, having a POJO that holds the information I want on the  
>>> file:
>>>
>>> * On the client side:
>>> - when connected, add a ObjectEncoder dynamically in the pipeline  
>>> (or have
>>> it already in the pipeline)
>>> - populates the POJO and send it to server
>>> - remove the ObjectEncoder
>>> - add the ChunkedWriteHandler in the pipeline
>>> - send the file
>>>
>>> * On the server side:
>>> - when connected, add a ObjectDecoder dynamically in the pipeline  
>>> (or have
>>> it already in the pipeline)
>>> - receive the POJO and process its data
>>> - remove the ObjectDecoder
>>> - add a custom handler to handle messages containing file pieces
>>>
>>> Is there another way to process such scenario? Thank you for any  
>>> help!
>>>
>>>
>>> Bertrand.
>>>
>>>
>>
>> -- 
>> View this message in context: http://n2.nabble.com/Process-different-types-of-messages-tp3643445p3643699.html
>> Sent from the Netty User Group mailing list archive at Nabble.com.
>> _______________________________________________
>> netty-users mailing list
>> netty-users at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/netty-users
>
> Mike McGrady
> Principal Investigator AF081-028 AFRL SBIR
> Senior Engineer
> Topia Technology, Inc
> 1.253.720.3365
> mmcgrady at topiatechnology.com
>
>
>
>
>
>
>
>
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users
>
>
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users

Mike McGrady
Principal Investigator AF081-028 AFRL SBIR
Senior Engineer
Topia Technology, Inc
1.253.720.3365
mmcgrady at topiatechnology.com







-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/netty-users/attachments/20090914/3dfeddf9/attachment-0001.html 


More information about the netty-users mailing list