Streaming to file

Mike McGrady mmcgrady at topiatechnology.com
Fri Nov 6 13:10:42 EST 2009


This is more than you need but it should help: see at the end of this  
email.  This particular implementation is for decoding a set of files  
and a management object where each file is sent in chunks.   
FileChannel is obtained from a FileOutputStream to which the bytes are  
written, cf. processKarmaFileChunk(...).  The file channel is saved  
until all chunks for a particular file are written.  The management  
object that also was written that has information about where the  
passed files are is itself passed to an application.

AS you can see the ChannelBuffer contains only a byte array that is  
processed in convertChannelBuffer.  Hope this helps.

Mike

P.S.  The Karma application is a mobile object as contrasted with a  
mobile agent application with interfaces that we hope will assist in  
providing a new IEEE standard that will complement FIPA.  We hope to  
make it public soon and then to begin the IEEE process.  Netty is the  
default transport.


On Nov 6, 2009, at 8:57 AM, Hoyt, David wrote:

> What’s the best way to get a ChannelBuffer into a file? I just want  
> to dump the incoming bytes into a file.
>
> I’m using NIO and I have a FileChannel and I’m just calling  
> "myFileChannel.write(myChannelBuffer.toByteBuffer());  
> myFileChannel.flush();" – is that right? Is there a better way?
>
> Is there a way to have my incoming ChannelBuffers automagically be a  
> ByteBuffer without having additional overhead of  
> calling .toByteBuffer() and risking byte copying?
>
> Thanks,
> - David Hoyt
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users

Mike McGrady
Principal Investigator AF081-028 AFRL SBIR
Senior Engineer
Topia Technology, Inc.
1.253.720.3365
mmcgrady at topiatechnology.com



/*
  * KARMA is free software: you can redistribute it and/or modify it  
under the
  * terms of the GNU LESSER GENERAL PUBLIC LICENSE as published by the  
Free
  * Software Foundation, either version 3 of the License, or (at your  
option) any
  * later version. KARMA is distributed in the hope that it will be  
useful, but
  * WITHOUT ANY WARRANTY; without even the implied warranty of  
MERCHANTABILITY or
  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU LESSER GENERAL  
PUBLIC LICENSE
  * for more details. You should have received a copy of the GNU  
LESSER GENERAL
  * PUBLIC LICENSE along with KARMA. If not, see <http://www.gnu.org/licenses/ 
 >.
  */
package net.sourceforge.karma.spi.transport.netty;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.security.cert.Certificate;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import javax.net.ssl.SSLException;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;

import net.sourceforge.karma.KarmaConstant;
import net.sourceforge.karma.KarmaManager;
import net.sourceforge.karma.spi.KarmaReceiver;
import net.sourceforge.karma.spi.util.KarmaUtility;
import net.sourceforge.karma.spi.util.NettyKarmaSslHandshake;
import net.sourceforge.karma.spi.util.StackTrace;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.ChildChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.logging.InternalLogLevel;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;

/**
  * <p>
  * A composition (rather than inheritance) a la Joshua Bloch design  
for the use
  * of JBoss Netty simple channel upstream and downstream handlers  
with a KARMA
  * decoder.
  * </p>
  * <p>
  * The principal method is <code>messageReceivedUpstream</code>.
  * </p>
  * <p>
  * The <code>KarmaReceiverPipelineFactory</code> utilizes this class  
along with
  * the standard JBoss Netty <code>SslHandler</code> and
  * <code>SslBufferPool</code>. Where the event is a
  * <code>ChannelStateEvent</code> and the channel state is
  * <code>CONNECTED</code>, the <code>handleUpstream</code> method  
initiates an
  * handshake with the client on the other end of the channel. The
  * <code>handshake</code> method adds a listener  
(<code>ConnectionNotice</code>
  * which implements <code>ChannelFutureListener</code>) to notify the  
client
  * when the handshake is successful.
  * </p>
  * <p>
  * All <code>OPEN</code> channels are added to the field
  * <code>allChannels</code> to make sure that shutdown is complete and
  * efficient.
  * <p>
  *
  * @author Mike McGrady
  * @version Karma Alpha v001
  * @since August 2009.
  */
@ChannelPipelineCoverage("all")
public final class KarmaDecoder implements ChannelUpstreamHandler,
		ChannelDownstreamHandler, Comparable<KarmaDecoder> {
	private static int managersCount;
	private static int channelCount;
	String errors;
	private static final AtomicInteger chunkCounter = new AtomicInteger(0);
	private static final InternalLogger logger = InternalLoggerFactory
			.getInstance(KarmaDecoder.class.getName());
	private static final AtomicInteger managerCounter = new AtomicInteger 
(0);
	private static final AtomicInteger readCounter = new AtomicInteger(0);
	private final ChannelGroup allChannels;

	private final Map<UUID, Integer> fileCount = new HashMap<UUID,  
Integer>();
	private final Map<UUID, FileChannel> fileWriters = new HashMap<UUID,  
FileChannel>();
	private final Map<UUID, KarmaManager> managers = new HashMap<UUID,  
KarmaManager>();
	private final KarmaReceiver receiver;

	private final SimpleChannelDownstreamHandler scdh = new  
SimpleChannelDownstreamHandler();

	private final SimpleChannelUpstreamHandler scuh = new  
SimpleChannelUpstreamHandler();

	/**
	 * This listener follows the functionality for the JBoss Netty chat  
example
	 * for the most part. This is just a standard use of the
	 * <code>ChannelFutureListener</code>.
	 *
	 * @author Mike McGrady
	 */
	private final class ConnectionNotice implements ChannelFutureListener {

		private final KarmaSslHandler sslHandler;

		ConnectionNotice(final KarmaSslHandler sslHandler) {
			this.sslHandler = sslHandler;
		}

		public void operationComplete(final ChannelFuture future) {
			KarmaDecoder.log(InternalLogLevel.INFO, "SslHandler: " //$NON-NLS-1$
					+ this.sslHandler);
			final HandshakeStatus hs = this.sslHandler.getEngine()
					.getHandshakeStatus();
			final SSLSession session = this.sslHandler.getEngine().getSession();

			KarmaDecoder.log(InternalLogLevel.INFO, "HandshakeStatus: " +  
hs); //$NON-NLS-1$
			KarmaDecoder.log(InternalLogLevel.INFO, "SSLSession: " +  
session); //$NON-NLS-1$
			final Certificate[] localCerts = session.getLocalCertificates();
			Certificate[] peerCerts = null;

			try {
				peerCerts = session.getPeerCertificates();
			} catch (final SSLPeerUnverifiedException e) {
				log(
						InternalLogLevel.INFO,
						(KarmaDecoder.this.errors = ("\n\t\tERROR MESSAGE: " //$NON-NLS-1$
								+ e.getMessage() + "\n\t\tERROR STACK TRACE: " + StackTrace // 
$NON-NLS-1$
								.trace(e))));
			}

			for (final Certificate localCert : localCerts) {
				log(InternalLogLevel.INFO, "" + localCert); //$NON-NLS-1$
			}

			if (peerCerts != null) {
				for (int i = 0; i < peerCerts.length; i++) {
					log(InternalLogLevel.INFO, "" + localCerts[i]); //$NON-NLS-1$
				}
			}

			if (future.isSuccess()) {
				final byte[] protectedSession = KarmaUtility
						.toBytes(new NettyKarmaSslHandshake(true,
								KarmaDecoder.this.errors));
				final ChannelBuffer protectedSessionBuf = ChannelBuffers
						.wrappedBuffer(protectedSession);
				future.getChannel().write(protectedSessionBuf);
			} else {
				final byte[] protectedSession = KarmaUtility
						.toBytes(new NettyKarmaSslHandshake(false,
								KarmaDecoder.this.errors));
				final ChannelBuffer protectedSessionBuf = ChannelBuffers
						.wrappedBuffer(protectedSession);
				future.getChannel().write(protectedSessionBuf);
			}
		}
	}

	/**
	 * Creates a new instance with the current system character set.
	 */
	public KarmaDecoder(final KarmaReceiver receiver,
			final ChannelGroup allChannels) {
		this.receiver = receiver;
		this.allChannels = allChannels;
	}

	public void channelOpenUpstream(final ChannelHandlerContext ctx,
			final ChannelStateEvent event) {
		KarmaDecoder.channelCount++;
		// Add all open channels to the global group so that they are able  
to be
		// closed on shutdown.
		final Channel channel = event.getChannel();
		this.allChannels.add(channel);
		ctx.sendUpstream(event);
	}

	public int compareTo(final KarmaDecoder decoder) {
		int result = -1;
		if (this == decoder) {
			result = -1;
		}
		if (decoder == null) {
			result = -1;
		}
		if (decoder.hashCode() > this.hashCode()) {
			result = +1;
		} else if (decoder.hashCode() < this.hashCode()) {
			result = -1;
		} else if (decoder.equals(this)) {
			result = 0;
		} else {
			// Probably impossible.
			result = -1;
		}

		return result;
	}

	@Override
	public boolean equals(final Object obj) {
		if (this == obj) { return true; }
		if (obj == null) { return false; }
		if (!(obj instanceof KarmaDecoder)) { return false; }
		final KarmaDecoder other = (KarmaDecoder) obj;
		if (this.allChannels == null) {
			if (other.allChannels != null) { return false; }
		} else if (!this.allChannels.equals(other.allChannels)) { return  
false; }
		if (this.fileCount == null) {
			if (other.fileCount != null) { return false; }
		} else if (!this.fileCount.equals(other.fileCount)) { return false; }
		if (this.fileWriters == null) {
			if (other.fileWriters != null) { return false; }
		} else if (!this.fileWriters.equals(other.fileWriters)) { return  
false; }
		if (this.managers == null) {
			if (other.managers != null) { return false; }
		} else if (!this.managers.equals(other.managers)) { return false; }
		if (this.receiver == null) {
			if (other.receiver != null) { return false; }
		} else if (!this.receiver.equals(other.receiver)) { return false; }
		if (this.scdh == null) {
			if (other.scdh != null) { return false; }
		} else if (!this.scdh.equals(other.scdh)) { return false; }
		if (this.scuh == null) {
			if (other.scuh != null) { return false; }
		} else if (!this.scuh.equals(other.scuh)) { return false; }
		return true;
	}

	/**
	 * @return the allChannels
	 */
	public ChannelGroup getAllChannels() {
		return this.allChannels;
	}

	/**
	 * {@inheritDoc} Down-casts the received downstream event into more
	 * meaningful sub-type event and calls an appropriate handler method  
with
	 * the down-casted event.
	 */
	public void handleDownstream(final ChannelHandlerContext ctx,
			final ChannelEvent channelEvent) {
		if (channelEvent instanceof MessageEvent) {
			try {
				this.scdh.writeRequested(ctx, (MessageEvent) channelEvent);
			} catch (final Exception e) {
				KarmaDecoder.log(InternalLogLevel.ERROR, e.getMessage());
				e.printStackTrace();
			}
		} else if (channelEvent instanceof ChannelStateEvent) {
			final ChannelStateEvent channelStateEvent = (ChannelStateEvent)  
channelEvent;
			switch (channelStateEvent.getState()) {
				case OPEN:
					if (!Boolean.TRUE.equals(channelStateEvent.getValue())) {
						try {
							this.scdh.closeRequested(ctx, channelStateEvent);
						} catch (final Exception e) {
							KarmaDecoder.log(InternalLogLevel.ERROR, e
									.getMessage());
							e.printStackTrace();
						}
					}
					break;
				case BOUND:
					if (channelStateEvent.getValue() != null) {
						try {
							this.scdh.bindRequested(ctx, channelStateEvent);
						} catch (final Exception e) {
							KarmaDecoder.log(InternalLogLevel.ERROR, e
									.getMessage());
							e.printStackTrace();
						}
					} else {
						try {
							this.scdh.unbindRequested(ctx, channelStateEvent);
						} catch (final Exception e) {
							KarmaDecoder.log(InternalLogLevel.ERROR, e
									.getMessage());
							e.printStackTrace();
						}
					}
					break;
				case CONNECTED:
					if (channelStateEvent.getValue() != null) {
						try {
							this.scdh.connectRequested(ctx, channelStateEvent);
						} catch (final Exception e) {
							KarmaDecoder.log(InternalLogLevel.ERROR, e
									.getMessage());
							e.printStackTrace();
						}
					} else {
						try {
							this.scdh.disconnectRequested(ctx,
									channelStateEvent);
						} catch (final Exception e) {
							KarmaDecoder.log(InternalLogLevel.ERROR, e
									.getMessage());
							e.printStackTrace();
						}
					}
					break;
				case INTEREST_OPS:
					try {
						this.scdh.setInterestOpsRequested(ctx,
								channelStateEvent);
					} catch (final Exception e) {
						KarmaDecoder
								.log(InternalLogLevel.ERROR, e.getMessage());
						e.printStackTrace();
					}
					break;
				default:
					ctx.sendDownstream(channelEvent);
			}
		} else {
			ctx.sendDownstream(channelEvent);
		}
	}

	/**
	 * Down-casts the received upstream event into more meaningful sub-type
	 * event and calls an appropriate handler method with the down-casted  
event.
	 */
	public void handleUpstream(final ChannelHandlerContext ctx,
			final ChannelEvent channelEvent) {
		if (channelEvent instanceof MessageEvent) {
			this.messageReceivedUpstream((MessageEvent) channelEvent);
		} else if (channelEvent instanceof WriteCompletionEvent) {
			final WriteCompletionEvent writeCompletionEvent =  
(WriteCompletionEvent) channelEvent;
			try {
				this.scuh.writeComplete(ctx, writeCompletionEvent);
			} catch (final Exception e) {
				KarmaDecoder.log(InternalLogLevel.ERROR, e.getMessage());
				e.printStackTrace();
			}
		} else if (channelEvent instanceof ChildChannelStateEvent) {
			final ChildChannelStateEvent childChannelStateEvent =  
(ChildChannelStateEvent) channelEvent;
			if (childChannelStateEvent.getChildChannel().isOpen()) {
				try {
					this.scuh.childChannelOpen(ctx, childChannelStateEvent);
				} catch (final Exception e) {
					KarmaDecoder.log(InternalLogLevel.ERROR, e.getMessage());
					e.printStackTrace();
				}
			} else {
				try {
					this.scuh.childChannelClosed(ctx, childChannelStateEvent);
				} catch (final Exception e) {
					KarmaDecoder.log(InternalLogLevel.ERROR, e.getMessage());
					e.printStackTrace();
				}
			}
		} else if (channelEvent instanceof ChannelStateEvent) {
			final ChannelStateEvent channelStateEvent = (ChannelStateEvent)  
channelEvent;
			switch (channelStateEvent.getState()) {
				case OPEN:
					if (Boolean.TRUE.equals(channelStateEvent.getValue())) {
						try {
							// Custom coding to make sure that all channels are
							// added to the channel group.
							this.channelOpenUpstream(ctx, channelStateEvent);
						} catch (final Exception e1) {
							KarmaDecoder.log(InternalLogLevel.ERROR, e1
									.getMessage());
							e1.printStackTrace();
						}
					} else {
						try {
							this.scuh.channelClosed(ctx, channelStateEvent);
						} catch (final Exception e1) {
							KarmaDecoder.log(InternalLogLevel.ERROR, e1
									.getMessage());
							e1.printStackTrace();
						}
					}
					break;
				case BOUND:
					if (channelStateEvent.getValue() != null) {
						try {
							this.scuh.channelBound(ctx, channelStateEvent);
						} catch (final Exception e) {
							KarmaDecoder.log(InternalLogLevel.ERROR, e
									.getMessage());
							e.printStackTrace();
						}
					} else {
						try {
							this.scuh.channelUnbound(ctx, channelStateEvent);
						} catch (final Exception e) {
							KarmaDecoder.log(InternalLogLevel.ERROR, e
									.getMessage());
							e.printStackTrace();
						}
					}
					break;
				case CONNECTED:
					if (channelStateEvent.getValue() != null) {
						try {
							// Custom coding to make sure that the exchanges are
							// appropriate encrypted with PKI.
							this.handshake(ctx, channelStateEvent);
						} catch (final Exception e) {
							KarmaDecoder.log(InternalLogLevel.ERROR, e
									.getMessage());
							e.printStackTrace();
						}
					} else {
						try {
							this.scuh.channelDisconnected(ctx,
									channelStateEvent);
						} catch (final Exception e) {
							KarmaDecoder.log(InternalLogLevel.ERROR, e
									.getMessage());
							e.printStackTrace();
						}
					}
					break;
				case INTEREST_OPS:
					try {
						this.scuh
								.channelInterestChanged(ctx, channelStateEvent);
					} catch (final Exception e) {
						KarmaDecoder
								.log(InternalLogLevel.ERROR, e.getMessage());
						e.printStackTrace();
					}
					break;
				default:
					ctx.sendUpstream(channelEvent);
			}
		} else if (channelEvent instanceof ExceptionEvent) {
			try {
				this.scuh.exceptionCaught(ctx, (ExceptionEvent) channelEvent);
			} catch (final Exception e) {
				KarmaDecoder.log(InternalLogLevel.ERROR, e.getMessage());
				e.printStackTrace();
			}
		} else {
			ctx.sendUpstream(channelEvent);
		}
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime
				* result
				+ ((this.allChannels == null) ? 0 : this.allChannels.hashCode());
		result = prime * result
				+ ((this.fileCount == null) ? 0 : this.fileCount.hashCode());
		result = prime
				* result
				+ ((this.fileWriters == null) ? 0 : this.fileWriters.hashCode());
		result = prime * result
				+ ((this.managers == null) ? 0 : this.managers.hashCode());
		result = prime * result
				+ ((this.receiver == null) ? 0 : this.receiver.hashCode());
		result = prime * result
				+ ((this.scdh == null) ? 0 : this.scdh.hashCode());
		result = prime * result
				+ ((this.scuh == null) ? 0 : this.scuh.hashCode());
		return result;
	}

	/**
	 * @param messageBuffer
	 * @return
	 */
	private Object convertChannelBuffer(final ChannelBuffer  
messageBuffer) {
		final int readableBytes = messageBuffer.readableBytes();
		final byte[] readBytes = new byte[readableBytes];
		messageBuffer.readBytes(readBytes);
		final Object object = KarmaUtility.toObject(readBytes);
		return object;
	}

	private void decode(final Channel channel, final Object msg) {

		final ChannelBuffer msgBuf = (ChannelBuffer) msg;
		/*
		 * Converts the channel buffer bytes to an object.
		 */
		final Object object = this.convertChannelBuffer(msgBuf);
		/*
		 * Keeps track of how many managers and how many file stream chunks  
have
		 * been processed. This envisions future additions to functionality.
		 */
		if (object instanceof KarmaManager) {
			KarmaDecoder.managerCounter.incrementAndGet();
		} else if (object instanceof KarmaChunk) {
			KarmaDecoder.chunkCounter.incrementAndGet();
		}

		/*
		 * Uses different methods to process KarmaChunks versus KarmaManagers.
		 */
		if (object instanceof KarmaChunk) {
			/*
			 * Chunk processing.
			 */
			final KarmaChunk chunk = (KarmaChunk) object;
			this.processKarmaFileChunk(channel, chunk);
		} else if (object instanceof KarmaManager) {
			/*
			 * Manager processing.
			 */
			final KarmaManager manager = (KarmaManager) object;
			this.processKarmaManager(manager);
		} else {
			/*
			 * This should never happen. TODO - get a more sophisticated
			 * response to this unwanted alternative.
			 */
			KarmaDecoder.log(InternalLogLevel.ERROR,
					"Not a legimate object type."); //$NON-NLS-1$
		}
	}

	/**
	 * Invoked when a {@link Channel} is open, bound to a local address,  
and
	 * connected to a remote address.
	 */
	private void handshake(final ChannelHandlerContext ctx,
			final ChannelStateEvent evnet) {

		// Get the SslHandler in the current pipeline.
		// We added it in SecureChatPipelineFactory.
		final KarmaSslHandler sslHandler = ctx.getPipeline().get(
				KarmaSslHandler.class);

		// Get notified when SSL handshake is done.
		ChannelFuture handshakeFuture = null;
		try {
			handshakeFuture = sslHandler.handshake(evnet.getChannel());
		} catch (final SSLException e) {
			KarmaDecoder.log(InternalLogLevel.ERROR, (this.errors = e
					.getMessage()));
		}

		handshakeFuture.addListener(new ConnectionNotice(sslHandler));
	}

	/**
	 * Invoked when a message object (e.g: {@link ChannelBuffer}) was  
received
	 * from a remote peer.
	 */
	private void messageReceivedUpstream(final MessageEvent event) {
		KarmaDecoder.readCounter.incrementAndGet();
		final Object originalMessage = event.getMessage();
		this.decode(event.getChannel(), originalMessage);
	}

	/**
	 * <p>
	 * This method processes data stream file chunks. A file output  
stream is
	 * created that has the appropriate location in ~/.KARMA/.AUM. A Java  
NIO
	 * file channel is obtained fromt he file output stream and is saved  
under
	 * the chunk's <code>UUID</code>, which is also the ID of the manager  
in the
	 * <code>fileWriters</code> <code>Map</code>. N.B.: this assumes that  
the
	 * chunks will arrive in order.
	 * </p>
	 * <p>
	 * Eventually we should not assume that the chunks arrive in order and
	 * should write them according to a given logic. This will allow us  
to take
	 * care of divergent delivery scenarios.
	 * </p>
	 *
	 * @param channel
	 * @param chunk
	 */
	private void processKarmaFileChunk(final Channel channel,
			final KarmaChunk chunk) {
		final byte[] chunkBytes = chunk.bytes;

		/*
		 * When the first file chunk arrives, create the file channel from the
		 * file output stream for later use.
		 */
		if (chunk.id == 1) {
			final String baseName = KarmaConstant.AUM + chunk.uuid;

			final File chunkFile = new File(baseName);

			if (!chunkFile.exists()) {
				chunkFile.mkdirs();
				chunkFile.isDirectory();
			}

			final String toFileName = baseName + File.separatorChar
					+ chunk.fileName;
			FileChannel fileChannel = null;
			try {
				fileChannel = new FileOutputStream(toFileName).getChannel();
			} catch (final FileNotFoundException e) {
				KarmaDecoder.log(InternalLogLevel.ERROR, e.getMessage());
			}

			this.fileWriters.put(chunk.uuid, fileChannel);
		}

		/*
		 * For all chunks, put the bytes into a pre-allocated byte buffer,  
flip
		 * and write.
		 */
		final ByteBuffer buffer = ByteBuffer.allocate(chunkBytes.length);
		buffer.put(chunkBytes);
		buffer.flip();

		try {
			this.fileWriters.get(chunk.uuid).write(buffer);

			/*
			 * When the chunk contains the end of the file, remove the file
			 * channel after writing and close it. This is sort of an
			 * independent check.
			 */
			if (chunk.fileLength == chunk.offset) {
				final FileChannel fileChannel = this.fileWriters
						.remove(chunk.uuid);
				fileChannel.close();
			}
		} catch (final IOException e) {
			KarmaDecoder.log(InternalLogLevel.ERROR, e.getMessage());
		}

		if (chunk.isLast) {
			final Integer fileNumber = this.fileCount.get(chunk.uuid);

			/*
			 * This value is NOT otiose. It should be kept. When the value is
			 * the first and last it does not get to be incremented.
			 */
			if (fileNumber == null) {
				this.fileCount.put(chunk.uuid, new Integer(1));
			}

			final Integer numberOfFiles = this.fileCount.get(chunk.uuid);
			/*
			 * Note that is NOT the number of CHUNKS but the number of FILES.
			 */
			int numberOfFilesReceived = numberOfFiles.intValue();

			final KarmaManager manager = this.managers.get(chunk.uuid);
			final int numberOfManagerFileNames = manager.getResources().size();

			/*
			 * We have now got all CHUNKS and all FILES.
			 */
			if ((numberOfManagerFileNames == numberOfFilesReceived)
					&& (this.managers.get(chunk.uuid) != null)) {
				/*
				 * If all the chunks have arrived.
				 */
				this.managers.remove(chunk.uuid);
				this.fileCount.remove(chunk.uuid);
				this.fileWriters.remove(chunk.uuid);
				/*
				 * This is the end of the road for this transmission. At this
				 * point all the files and the manager have been received and
				 * the chunks have been written to the file by the file channel.
				 * The manager will now be sent to the application with all the
				 * requisite information to get to the file resources and to do
				 * whatever the application likes.
				 */
				this.receiver.update(manager);
			} else {
				/*
				 * Keep track of how many FILES have arrived.
				 */
				numberOfFilesReceived++;
				this.fileCount.put(chunk.uuid, new Integer(
						numberOfFilesReceived));
			}
		}

		buffer.clear();
	}

	/**
	 * <p>
	 * This method processes the manager objects.
	 * </p>
	 *
	 * @param manager
	 */
	private void processKarmaManager(final KarmaManager manager) {
		managersCount++;
		final UUID uuid = manager.getId();

		this.managers.put(uuid, manager);
	}

	static void log(InternalLogLevel level, String msg) {
		logger.log(level, msg);
	}
}






-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/netty-users/attachments/20091106/2a4aeec9/attachment-0001.html 


More information about the netty-users mailing list