Process different types of messages

Michael McGrady mmcgrady at topiatechnology.com
Mon Sep 14 13:22:18 EDT 2009


I was going to say that this is what KarmaDecoder does.  Here is the  
basic data object used in KarmaDecoder.

/**
  * @author Mike McGrady
  * @version Karma Alpha v001
  * @param <KarmaFileChunk>
  * @since August 2009.
  */
public final class KarmaFileChunk implements Serializable,
		Comparable<KarmaFileChunk> {
	/**
	 *
	 */
	private static final long serialVersionUID = -8062599394269438322L;
	/**
	 * The last chunk of a file.
	 */
	public final boolean isLast;
	/**
	 * The ID of the associated KarmaManager.
	 */
	public final UUID uuid;
	/**
	 * The bytes of the chunk.
	 */
	public final byte[] bytes;
	/**
	 * The sequential ID of the chunk beginning with the integer 1.
	 */
	public final int id;
	/**
	 * The offset after the chunk is added to the file.
	 */
	public final long offset;
	/**
	 * The default size of a chunk.
	 */
	public final int chunkSize;
	/**
	 * The length of the file.
	 */
	public final int fileLength;
	/**
	 * The filename.
	 */
	public final String fileName;

	public final int numberOfChunks;

	public KarmaFileChunk(UUID uuid, final byte[] bytes, final int id,
			final long offset, final int chunkSize, final int fileLength,
			boolean isLast, String fileName) {
		this.uuid = uuid;
		this.bytes = bytes;
		this.id = id;
		this.offset = offset;
		this.chunkSize = chunkSize;
		this.fileLength = fileLength;
		this.isLast = isLast;
		this.fileName = fileName;
		if (!this.isLast) {
			this.numberOfChunks = (fileLength / chunkSize)
					+ (((fileLength % chunkSize) > 0) ? 1 : 0);
		} else {
			this.numberOfChunks = this.id;
		}
	}

	public int compareTo(KarmaFileChunk chunk) {
		int result = -1;
		if (this == chunk) {
			result = -1;
		}
		if (chunk == null) {
			result = -1;
		}
		if (chunk.hashCode() > this.hashCode()) {
			result = +1;
		} else if (chunk.hashCode() < this.hashCode()) {
			result = -1;
		} else if (chunk.equals(this)) {
			result = 0;
		} else {
			// Probably impossible.
			result = -1;
		}

		return result;
	}

	@Override
	public String toString() {
		return this.getClass().getSimpleName() + hashCode()
				+ " \n\tfile uuid: " + uuid + " offset=" + offset
				+ " chunkSize=" + chunkSize + " bytes=" + bytes
				+ " fileLength: " + fileLength + " fileName: " + fileName
				+ " + isLast: " + isLast + " numberOfChunks: " + numberOfChunks
				+ " chunk sequence id (number)=" + id;
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + Arrays.hashCode(this.bytes);
		result = prime * result + this.chunkSize;
		result = prime * result + this.fileLength;
		result = prime * result
				+ ((this.fileName == null) ? 0 : this.fileName.hashCode());
		result = prime * result + this.id;
		result = prime * result + (this.isLast ? 1231 : 1237);
		result = prime * result + this.numberOfChunks;
		result = prime * result + (int) (this.offset ^ (this.offset >>> 32));
		result = prime * result
				+ ((this.uuid == null) ? 0 : this.uuid.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj) {
			return true;
		}
		if (obj == null) {
			return false;
		}
		if (!(obj instanceof KarmaFileChunk)) {
			return false;
		}
		KarmaFileChunk other = (KarmaFileChunk) obj;
		if (!Arrays.equals(this.bytes, other.bytes)) {
			return false;
		}
		if (this.chunkSize != other.chunkSize) {
			return false;
		}
		if (this.fileLength != other.fileLength) {
			return false;
		}
		if (this.fileName == null) {
			if (other.fileName != null) {
				return false;
			}
		} else if (!this.fileName.equals(other.fileName)) {
			return false;
		}
		if (this.id != other.id) {
			return false;
		}
		if (this.isLast != other.isLast) {
			return false;
		}
		if (this.numberOfChunks != other.numberOfChunks) {
			return false;
		}
		if (this.offset != other.offset) {
			return false;
		}
		if (this.uuid == null) {
			if (other.uuid != null) {
				return false;
			}
		} else if (!this.uuid.equals(other.uuid)) {
			return false;
		}

		return true;
	}
}


/**
  * <p>
  * Follows in general outline the JBoss Netty <code>ChunkedNioStream</ 
code>.
  * <code>KarmaChunkedInput</code> extends Netty <code>ChunkedInput</ 
code> and
  * adds a close method.
  * </p>
  *
  * @author Mike McGrady
  * @version Karma Alpha v001
  * @since August 2009.
  */
public class KarmaChunkedNioStream implements KarmaChunkedInput {

	private static final InternalLogger logger = InternalLoggerFactory
			.getInstance(KarmaChunkedNioStream.class.getName());
	static final int DEFAULT_CHUNK_SIZE = 8192;
	private final int length;
	private final UUID uuid;
	private final ReadableByteChannel in;
	private int id = 1;
	private final String fileName;

	private int chunkSize;
	private volatile long offset;

	/**
	 * Associated ByteBuffer
	 */
	private ByteBuffer byteBuffer = null;

	/**
	 * Creates a new instance that fetches data from the specified channel.
	 */
	public KarmaChunkedNioStream(int length, ReadableByteChannel in, File  
file,
			UUID uuid) {
		this(length, in, DEFAULT_CHUNK_SIZE, file, uuid);
	}

	/**
	 * @param length
	 * @param in
	 * @param chunkSize
	 * @param file
	 * @param uuid
	 *            <code>KarmaManager</code> ID.
	 */
	public KarmaChunkedNioStream(int length, ReadableByteChannel in,
			int chunkSize, File file, UUID uuid) {
		this.length = length;
		this.uuid = uuid;
		if (in == null) {
			throw new NullPointerException("in");
		}
		if (chunkSize <= 0) {
			throw new IllegalArgumentException("chunkSize: " + chunkSize
					+ " (expected: a positive integer)");
		}
		this.in = in;
		offset = 0;
		this.chunkSize = chunkSize;
		if (byteBuffer != null) {
			if (byteBuffer.capacity() != chunkSize) {
				byteBuffer = null;
				byteBuffer = ByteBuffer.allocate(chunkSize);
			}
		} else {
			byteBuffer = ByteBuffer.allocate(chunkSize);
		}

		this.fileName = file.getName();
	}

	/**
	 * Returns the number of transferred bytes.
	 */
	public long getTransferredBytes() {
		return offset;
	}

	public boolean hasNextChunk() {
		if (byteBuffer.position() > 0) {
			// A previous read was not over, so there is a next chunk in the
			// buffer at least
			return true;
		}
		if (in.isOpen()) {
			// Try to read a new part, and keep this part (no rewind)
			int b = -1;
			try {
				b = in.read(byteBuffer);
			} catch (IOException e) {
				logger.log(InternalLogLevel.ERROR, e.getMessage());
				e.printStackTrace();
			}
			if (b < 0) {
				return false;
			} else {
				offset += b;
				return true;
			}
		}
		return false;
	}

	public void close() {
		try {
			in.close();
		} catch (IOException e) {
			logger.log(InternalLogLevel.ERROR, e.getMessage());
			e.printStackTrace();
		}
	}

	public KarmaFileChunk nextChunk() {
		if (!hasNextChunk()) {
			return null;
		}
		// buffer cannot be empty from there
		int readBytes = byteBuffer.position();
		for (;;) {
			int localReadBytes = -1;
			try {
				localReadBytes = in.read(byteBuffer);
			} catch (IOException e) {
				logger.log(InternalLogLevel.ERROR, e.getMessage());
				e.printStackTrace();
			}

			if (localReadBytes < 0) {
				break;
			}

			readBytes += localReadBytes;
			offset += localReadBytes;

			if (readBytes == chunkSize) {
				break;
			}

		}

		byteBuffer.flip();

		int lim = byteBuffer.limit();

		byte[] chunk = null;

		if (lim == DEFAULT_CHUNK_SIZE) {
			chunk = byteBuffer.array();
		} else {
			chunk = new byte[lim];
			byteBuffer.get(chunk);
		}

		byteBuffer.clear();
		KarmaFileChunk karmaChunk = new KarmaFileChunk(this.uuid, chunk, id++,
				this.offset, chunkSize = lim, length,
				((offset == length) ? true : false), fileName);

		return karmaChunk;
	}
}

On Sep 14, 2009, at 9:44 AM, bgoetzmann wrote:

>
> I just come to try the approach I spoke about, and it works.
> Netty is really powerful!
>
> Cheers,
>
> Bertrand.
>
>
> bgoetzmann wrote:
>>
>> Hello,
>>
>> In my Netty project whose goal is to transfer large file form a  
>> client to
>> a server, I used an ChunkedWriteHandler instance and a custom client
>> handler that uses the ChunkedFile class in order to send a file  
>> piece by
>> piece. All is OK!
>>
>> The next step for me would be to send first some information on the  
>> file
>> that is about to be sent (name, size, etc.). So I need to send a POJO
>> instance.
>> What is the approach I can take? Can you tell me if this is a valid
>> approach, having a POJO that holds the information I want on the  
>> file:
>>
>> * On the client side:
>> - when connected, add a ObjectEncoder dynamically in the pipeline  
>> (or have
>> it already in the pipeline)
>> - populates the POJO and send it to server
>> - remove the ObjectEncoder
>> - add the ChunkedWriteHandler in the pipeline
>> - send the file
>>
>> * On the server side:
>> - when connected, add a ObjectDecoder dynamically in the pipeline  
>> (or have
>> it already in the pipeline)
>> - receive the POJO and process its data
>> - remove the ObjectDecoder
>> - add a custom handler to handle messages containing file pieces
>>
>> Is there another way to process such scenario? Thank you for any  
>> help!
>>
>>
>> Bertrand.
>>
>>
>
> -- 
> View this message in context: http://n2.nabble.com/Process-different-types-of-messages-tp3643445p3643699.html
> Sent from the Netty User Group mailing list archive at Nabble.com.
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users

Mike McGrady
Principal Investigator AF081-028 AFRL SBIR
Senior Engineer
Topia Technology, Inc
1.253.720.3365
mmcgrady at topiatechnology.com







-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/netty-users/attachments/20090914/6b2507d5/attachment-0001.html 


More information about the netty-users mailing list